Repository: olingo-odata4 Updated Branches: refs/heads/master 15e7718a0 -> 0b05798cd
[OLINGO-246, OLINGO-248] provided async batch support Project: http://git-wip-us.apache.org/repos/asf/olingo-odata4/repo Commit: http://git-wip-us.apache.org/repos/asf/olingo-odata4/commit/0b05798c Tree: http://git-wip-us.apache.org/repos/asf/olingo-odata4/tree/0b05798c Diff: http://git-wip-us.apache.org/repos/asf/olingo-odata4/diff/0b05798c Branch: refs/heads/master Commit: 0b05798cd943529bf0099cb05760f2baf1c6cf9b Parents: 15e7718 Author: fmartelli <[email protected]> Authored: Mon Apr 28 13:10:40 2014 +0200 Committer: fmartelli <[email protected]> Committed: Mon Apr 28 13:10:40 2014 +0200 ---------------------------------------------------------------------- .../java/org/apache/olingo/fit/V4Services.java | 71 +++++++++-- .../request/v4/AsyncBatchRequestWrapper.java | 48 ++++++++ .../request/v4/AsyncRequestFactory.java | 3 + .../response/v4/AsyncResponseWrapper.java | 11 ++ .../batch/ODataChangesetResponseItem.java | 13 +- .../batch/ODataRetrieveResponseItem.java | 7 +- .../request/batch/v4/ODataBatchRequestImpl.java | 3 + .../v4/AsyncBatchRequestWrapperImpl.java | 119 +++++++++++++++++++ .../request/v4/AsyncRequestFactoryImpl.java | 7 ++ .../request/v4/AsyncRequestWrapperImpl.java | 70 ++++++----- .../response/AbstractODataResponse.java | 6 +- .../response/v4/AsyncResponseImpl.java | 27 +++++ .../client/core/it/v4/BatchTestITCase.java | 84 ++++++++++++- 13 files changed, 415 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/fit/src/main/java/org/apache/olingo/fit/V4Services.java ---------------------------------------------------------------------- diff --git a/fit/src/main/java/org/apache/olingo/fit/V4Services.java b/fit/src/main/java/org/apache/olingo/fit/V4Services.java index 25eac89..38d8696 100644 --- a/fit/src/main/java/org/apache/olingo/fit/V4Services.java +++ b/fit/src/main/java/org/apache/olingo/fit/V4Services.java @@ -51,12 +51,15 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.cxf.interceptor.InInterceptors; import org.apache.cxf.jaxrs.ext.multipart.Attachment; +import org.apache.cxf.jaxrs.ext.multipart.Multipart; +import org.apache.cxf.jaxrs.ext.multipart.MultipartBody; import org.apache.olingo.commons.api.data.CollectionValue; import org.apache.olingo.commons.api.data.ResWrap; import org.apache.olingo.commons.api.data.Entry; import org.apache.olingo.commons.api.data.Feed; import org.apache.olingo.commons.api.data.Property; import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; +import org.apache.olingo.commons.api.format.ContentType; import org.apache.olingo.commons.core.data.AtomEntryImpl; import org.apache.olingo.commons.core.data.AtomFeedImpl; import org.apache.olingo.commons.core.data.AtomPropertyImpl; @@ -152,6 +155,54 @@ public class V4Services extends AbstractServices { } } + @POST + @Path("/async/$batch") + public Response async( + @Context final UriInfo uriInfo, + @HeaderParam("Prefer") @DefaultValue(StringUtils.EMPTY) final String prefer, + final @Multipart MultipartBody attachment) { + + try { + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + bos.write("HTTP/1.1 200 Ok".getBytes()); + bos.write(CRLF); + bos.write("OData-Version: 4.0".getBytes()); + bos.write(CRLF); + bos.write(("Content-Type: " + ContentType.APPLICATION_OCTET_STREAM + ";boundary=" + BOUNDARY).getBytes()); + bos.write(CRLF); + bos.write(CRLF); + + bos.write(("--" + BOUNDARY).getBytes()); + bos.write(CRLF); + bos.write("Content-Type: application/http".getBytes()); + bos.write(CRLF); + bos.write("Content-Transfer-Encoding: binary".getBytes()); + bos.write(CRLF); + bos.write(CRLF); + + bos.write("HTTP/1.1 202 Accepted".getBytes()); + bos.write(CRLF); + bos.write("Location: http://service-root/async-monitor".getBytes()); + bos.write(CRLF); + bos.write("Retry-After: 10".getBytes()); + bos.write(CRLF); + bos.write(CRLF); + bos.write(("--" + BOUNDARY + "--").getBytes()); + bos.write(CRLF); + + final UUID uuid = UUID.randomUUID(); + providedAsync.put(uuid.toString(), bos.toString(Constants.ENCODING.toString())); + + bos.flush(); + bos.close(); + + return xml.createAsyncResponse( + uriInfo.getRequestUri().toASCIIString().replace("async/$batch", "") + "monitor/" + uuid.toString()); + } catch (Exception e) { + return xml.createFaultResponse(Accept.JSON.toString(), e); + } + } + @GET @Path("/async/{name}") public Response async( @@ -568,7 +619,7 @@ public class V4Services extends AbstractServices { return utils.getValue().createResponse( FSManager.instance(version).readFile(Constants.get(version, ConstantKey.REF) - + File.separatorChar + filename, utils.getKey()), + + File.separatorChar + filename, utils.getKey()), null, utils.getKey()); } catch (Exception e) { @@ -590,7 +641,7 @@ public class V4Services extends AbstractServices { final Response response = getEntityInternal(uriInfo.getRequestUri().toASCIIString(), - accept, entitySetName, entityId, accept, StringUtils.EMPTY, StringUtils.EMPTY, false); + accept, entitySetName, entityId, accept, StringUtils.EMPTY, StringUtils.EMPTY, false); return response.getStatus() >= 400 ? postNewEntity(uriInfo, accept, contentType, prefer, entitySetName, changes) : super.patchEntity(uriInfo, accept, contentType, prefer, ifMatch, entitySetName, entityId, changes); @@ -688,8 +739,8 @@ public class V4Services extends AbstractServices { } else { final ResWrap<JSONEntryImpl> jcontainer = mapper.readValue(IOUtils.toInputStream(entity, Constants.ENCODING), - new TypeReference<JSONEntryImpl>() { - }); + new TypeReference<JSONEntryImpl>() { + }); entry = dataBinder.toAtomEntry(jcontainer.getPayload()); @@ -787,7 +838,7 @@ public class V4Services extends AbstractServices { final ResWrap<JSONEntryImpl> jsonContainer = mapper.readValue( IOUtils.toInputStream(changes, Constants.ENCODING), new TypeReference<JSONEntryImpl>() { - }); + }); jsonContainer.getPayload().setType(typeInfo.getFullQualifiedName().toString()); entryChanges = dataBinder.toAtomEntry(jsonContainer.getPayload()); } @@ -820,7 +871,7 @@ public class V4Services extends AbstractServices { // 1. Fetch the contained entity to be removed final InputStream entry = FSManager.instance(version). readFile(containedPath(entityId, containedEntitySetName). - append('(').append(containedEntityId).append(')').toString(), Accept.ATOM); + append('(').append(containedEntityId).append(')').toString(), Accept.ATOM); final ResWrap<AtomEntryImpl> container = atomDeserializer.read(entry, AtomEntryImpl.class); // 2. Remove the contained entity @@ -1049,8 +1100,8 @@ public class V4Services extends AbstractServices { } else { final ResWrap<JSONPropertyImpl> paramContainer = mapper.readValue(IOUtils.toInputStream(param, Constants.ENCODING), - new TypeReference<JSONPropertyImpl>() { - }); + new TypeReference<JSONPropertyImpl>() { + }); property = paramContainer.getPayload(); } @@ -1091,8 +1142,8 @@ public class V4Services extends AbstractServices { } else { final ResWrap<JSONPropertyImpl> paramContainer = mapper.readValue(IOUtils.toInputStream(param, Constants.ENCODING), - new TypeReference<JSONPropertyImpl>() { - }); + new TypeReference<JSONPropertyImpl>() { + }); property = paramContainer.getPayload(); } http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java ---------------------------------------------------------------------- diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java new file mode 100644 index 0000000..9efa0fd --- /dev/null +++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.olingo.client.api.communication.request.v4; + +import org.apache.olingo.client.api.communication.request.batch.ODataChangeset; +import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate; +import org.apache.olingo.client.api.communication.response.ODataBatchResponse; + +public interface AsyncBatchRequestWrapper extends AsyncRequestWrapper<ODataBatchResponse> { + + /** + * Gets a changeset batch item instance. A changeset can be submitted embedded into a batch request only. + * + * @return ODataChangeset instance. + */ + ODataChangeset addChangeset(); + + /** + * Gets a retrieve batch item instance. A retrieve item can be submitted embedded into a batch request only. + * + * @return ODataRetrieve instance. + */ + ODataRetrieve addRetrieve(); + + /** + * Gets an outside change batch item instance. An outside item can be submitted embedded into a batch request only. + * + * @return ODataOutsideUpdate instance. + */ + ODataOutsideUpdate addOutsideUpdate(); +} http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java ---------------------------------------------------------------------- diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java index 9fd06dc..17b510f 100644 --- a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java +++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java @@ -19,10 +19,13 @@ package org.apache.olingo.client.api.communication.request.v4; import org.apache.olingo.client.api.communication.request.ODataRequest; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest; import org.apache.olingo.client.api.communication.response.ODataResponse; @SuppressWarnings("unchecked") public interface AsyncRequestFactory { <R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest); + + AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest); } http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java ---------------------------------------------------------------------- diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java index 87b6904..04d3688 100644 --- a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java +++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java @@ -18,6 +18,7 @@ */ package org.apache.olingo.client.api.communication.response.v4; +import java.net.URI; import org.apache.olingo.client.api.communication.response.ODataDeleteResponse; import org.apache.olingo.client.api.communication.response.ODataResponse; @@ -50,6 +51,16 @@ public interface AsyncResponseWrapper<R extends ODataResponse> { R getODataResponse(); /** + * Specifies the location for the next monitor check. + * <br /> + * Overrides the location value retrieved among headers and nullifies the previous valid response (if exists). + * + * @param uri monitor location. + * @return the current async response wrapper. + */ + AsyncResponseWrapper<R> forceNextMonitorCheck(URI uri); + + /** * DeleteA DELETE request sent to the status monitor resource requests that the asynchronous processing be canceled. A * 200 OK or to a 204 No Content response indicates that the asynchronous processing has been successfully canceled. * http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java ---------------------------------------------------------------------- diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java index 53eeb16..51f3a48 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java @@ -25,6 +25,7 @@ import org.apache.olingo.client.api.ODataBatchConstants; import org.apache.olingo.client.api.communication.response.ODataResponse; import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG; import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse; +import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl; /** * Changeset wrapper for the corresponding batch item. @@ -116,11 +117,15 @@ public class ODataChangesetResponseItem extends AbstractODataBatchResponseItem { final Map.Entry<Integer, String> responseLine = ODataBatchUtilities.readResponseLine(batchLineIterator); LOG.debug("Retrieved item response {}", responseLine); - if (responseLine.getKey() >= 400) { - // generate error response - final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator); - LOG.debug("Retrieved item headers {}", headers); + final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator); + LOG.debug("Retrieved item headers {}", headers); + if (responseLine.getKey() == 202) { + // generate async response + current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary); + return current; + } else if (responseLine.getKey() >= 400) { + // generate error response current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary); return current; } http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java ---------------------------------------------------------------------- diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java index 8919ffe..a180b37 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java @@ -24,6 +24,7 @@ import java.util.NoSuchElementException; import org.apache.olingo.client.api.communication.response.ODataResponse; import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG; import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse; +import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl; /** * Retrieve response wrapper for the corresponding batch item. @@ -58,7 +59,11 @@ public class ODataRetrieveResponseItem extends AbstractODataBatchResponseItem { final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator); LOG.debug("Retrieved item headers {}", headers); - if (responseLine.getKey() >= 400) { + if (responseLine.getKey() == 202) { + // generate async response + current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary); + breakingitem = true; + } else if (responseLine.getKey() >= 400) { // generate error response current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary); breakingitem = true; http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java ---------------------------------------------------------------------- diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java index 648cb35..a445013 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java @@ -122,6 +122,9 @@ public class ODataBatchRequestImpl */ protected class ODataBatchResponseImpl extends AbstractODataResponse implements ODataBatchResponse { + private ODataBatchResponseImpl() { + } + /** * Constructor. * http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java ---------------------------------------------------------------------- diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java new file mode 100644 index 0000000..8a3d903 --- /dev/null +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.olingo.client.core.communication.request.v4; + +import java.net.URI; +import java.util.Collection; +import org.apache.commons.io.IOUtils; +import org.apache.olingo.client.api.communication.header.HeaderName; +import org.apache.olingo.client.api.communication.header.ODataPreferences; +import org.apache.olingo.client.api.communication.request.batch.ODataChangeset; +import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve; +import org.apache.olingo.client.api.communication.request.batch.v4.BatchStreamManager; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate; +import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper; +import org.apache.olingo.client.api.communication.response.ODataBatchResponse; +import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper; +import org.apache.olingo.client.api.v4.ODataClient; +import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; + +public class AsyncBatchRequestWrapperImpl extends AsyncRequestWrapperImpl<ODataBatchResponse> + implements AsyncBatchRequestWrapper { + + private BatchStreamManager batchStreamManager; + + protected AsyncBatchRequestWrapperImpl(final ODataClient odataClient, final ODataBatchRequest odataRequest) { + super(odataClient, odataRequest); + batchStreamManager = odataRequest.execute(); + } + + /** + * {@inheritDoc} + */ + @Override + public ODataChangeset addChangeset() { + return batchStreamManager.addChangeset(); + } + + /** + * {@inheritDoc} + */ + @Override + public ODataRetrieve addRetrieve() { + return batchStreamManager.addRetrieve(); + } + + /** + * {@inheritDoc} + */ + @Override + public ODataOutsideUpdate addOutsideUpdate() { + return batchStreamManager.addOutsideUpdate(); + } + + @Override + public AsyncResponseWrapper<ODataBatchResponse> execute() { + return new AsyncResponseWrapperImpl(batchStreamManager.getResponse()); + } + + public class AsyncResponseWrapperImpl + extends AsyncRequestWrapperImpl<ODataBatchResponse>.AsyncResponseWrapperImpl { + + /** + * Constructor. + * + * @param res OData batch response. + */ + public AsyncResponseWrapperImpl(final ODataBatchResponse res) { + super(); + + if (res.getStatusCode() == 202) { + retrieveMonitorDetails(res); + } else { + response = res; + } + } + + private void retrieveMonitorDetails(final ODataBatchResponse res) { + Collection<String> headers = res.getHeader(HeaderName.location.toString()); + if (headers == null || headers.isEmpty()) { + throw new AsyncRequestException("Invalid async request response. Monitor URL not found"); + } else { + this.location = URI.create(headers.iterator().next()); + } + + headers = res.getHeader(HeaderName.retryAfter.toString()); + if (headers != null && !headers.isEmpty()) { + this.retryAfter = Integer.parseInt(headers.iterator().next()); + } + + headers = res.getHeader(HeaderName.preferenceApplied.toString()); + if (headers != null && !headers.isEmpty()) { + for (String header : headers) { + if (header.equalsIgnoreCase(new ODataPreferences(ODataServiceVersion.V40).respondAsync())) { + preferenceApplied = true; + } + } + } + + IOUtils.closeQuietly(res.getRawResponse()); + } + } +} http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java ---------------------------------------------------------------------- diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java index d6905dc..d5331bc 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java @@ -19,6 +19,8 @@ package org.apache.olingo.client.core.communication.request.v4; import org.apache.olingo.client.api.communication.request.ODataRequest; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest; +import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper; import org.apache.olingo.client.api.communication.request.v4.AsyncRequestFactory; import org.apache.olingo.client.api.communication.request.v4.AsyncRequestWrapper; import org.apache.olingo.client.api.communication.response.ODataResponse; @@ -39,4 +41,9 @@ public class AsyncRequestFactoryImpl implements AsyncRequestFactory { public <R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest) { return new AsyncRequestWrapperImpl<R>(client, odataRequest); } + + @Override + public AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest) { + return new AsyncBatchRequestWrapperImpl(client, odataRequest); + } } http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java ---------------------------------------------------------------------- diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java index 57305cd..7aef8a9 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java @@ -41,7 +41,6 @@ import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapp import org.apache.olingo.client.api.http.HttpClientException; import org.apache.olingo.client.api.http.HttpMethod; import org.apache.olingo.client.api.v4.ODataClient; -import org.apache.olingo.client.core.communication.header.ODataHeadersImpl; import org.apache.olingo.client.core.communication.request.AbstractODataRequest; import org.apache.olingo.client.core.communication.request.AbstractRequest; import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; @@ -49,34 +48,29 @@ import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRequest implements AsyncRequestWrapper<R> { - private static final int MAX_RETRY = 5; + protected static final int MAX_RETRY = 5; - private final ODataClient odataClient; + protected final ODataClient odataClient; /** * Request to be wrapped. */ - private final ODataRequest odataRequest; + protected final ODataRequest odataRequest; /** * HTTP client. */ - private final HttpClient httpClient; + protected final HttpClient httpClient; /** * HTTP request. */ - private final HttpUriRequest request; - - /** - * OData request header. - */ - private final ODataHeadersImpl odataHeaders; + protected final HttpUriRequest request; /** * Target URI. */ - private final URI uri; + protected final URI uri; protected AsyncRequestWrapperImpl(final ODataClient odataClient, final ODataRequest odataRequest) { this.odataRequest = odataRequest; @@ -88,9 +82,6 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe this.odataClient = odataClient; final HttpMethod method = odataRequest.getMethod(); - // initialize default headers - this.odataHeaders = (ODataHeadersImpl) odataClient.getVersionHeaders(); - // target uri this.uri = odataRequest.getURI(); @@ -104,19 +95,19 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe } @Override - public AsyncRequestWrapper<R> wait(final int waitInSeconds) { + public final AsyncRequestWrapper<R> wait(final int waitInSeconds) { extendHeader(HeaderName.prefer.toString(), new ODataPreferences(ODataServiceVersion.V40).wait(waitInSeconds)); return this; } @Override - public AsyncRequestWrapper<R> callback(URI url) { + public final AsyncRequestWrapper<R> callback(URI url) { extendHeader(HeaderName.prefer.toString(), new ODataPreferences(ODataServiceVersion.V40).callback(url.toASCIIString())); return this; } - private void extendHeader(final String headerName, final String headerValue) { + protected final void extendHeader(final String headerName, final String headerValue) { final StringBuilder extended = new StringBuilder(); if (this.odataRequest.getHeaderNames().contains(headerName)) { extended.append(this.odataRequest.getHeader(headerName)).append(", "); @@ -130,7 +121,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe return new AsyncResponseWrapperImpl(doExecute()); } - private HttpResponse doExecute() { + protected HttpResponse doExecute() { // Add all available headers for (String key : odataRequest.getHeaderNames()) { final String value = odataRequest.getHeader(key); @@ -143,13 +134,16 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe public class AsyncResponseWrapperImpl implements AsyncResponseWrapper<R> { - private URI location = null; + protected URI location = null; - private R response = null; + protected R response = null; - private int retryAfter = 5; + protected int retryAfter = 5; - private boolean preferenceApplied = false; + protected boolean preferenceApplied = false; + + public AsyncResponseWrapperImpl() { + } /** * Constructor. @@ -159,7 +153,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe @SuppressWarnings("unchecked") public AsyncResponseWrapperImpl(final HttpResponse res) { if (res.getStatusLine().getStatusCode() == 202) { - retrieveMonitorDetails(res, true); + retrieveMonitorDetails(res); } else { response = (R) ((AbstractODataRequest<?>) odataRequest).getResponseTemplate().initFromHttpResponse(res); } @@ -177,7 +171,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe final HttpResponse res = checkMonitor(location); if (res.getStatusLine().getStatusCode() == 202) { - retrieveMonitorDetails(res, false); + retrieveMonitorDetails(res); } else { response = instantiateResponse(res); } @@ -219,18 +213,34 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe return response; } + /** + * {@inheritDoc} + */ @Override public ODataDeleteResponse delete() { final ODataDeleteRequest deleteRequest = odataClient.getCUDRequestFactory().getDeleteRequest(location); return deleteRequest.execute(); } + /** + * {@inheritDoc} + */ @Override public AsyncResponseWrapper<ODataDeleteResponse> asyncDelete() { return odataClient.getAsyncRequestFactory().<ODataDeleteResponse>getAsyncRequestWrapper( odataClient.getCUDRequestFactory().getDeleteRequest(location)).execute(); } + /** + * {@inheritDoc} + */ + @Override + public AsyncResponseWrapper<R> forceNextMonitorCheck(final URI uri) { + this.location = uri; + this.response = null; + return this; + } + @SuppressWarnings("unchecked") private R instantiateResponse(final HttpResponse res) { R odataResponse; @@ -246,7 +256,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe return odataResponse; } - private void retrieveMonitorDetails(final HttpResponse res, final boolean includePreferenceApplied) { + private void retrieveMonitorDetails(final HttpResponse res) { Header[] headers = res.getHeaders(HeaderName.location.toString()); if (ArrayUtils.isNotEmpty(headers)) { this.location = URI.create(headers[0].getValue()); @@ -276,7 +286,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe } } - private HttpResponse checkMonitor(final URI location) { + protected final HttpResponse checkMonitor(final URI location) { if (location == null) { throw new AsyncRequestException("Invalid async request response. Missing monitor URL"); } @@ -287,10 +297,8 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe return executeHttpRequest(httpClient, monitor); } - private HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) { - checkRequest(odataClient, request); - - HttpResponse response; + protected final HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) { + final HttpResponse response; try { response = client.execute(req); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java ---------------------------------------------------------------------- diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java index d66fa20..fe8fa52 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; -import java.nio.charset.Charset; import java.util.Collection; import java.util.HashSet; import java.util.Map; @@ -35,6 +34,7 @@ import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; import org.apache.olingo.client.api.communication.header.HeaderName; +import org.apache.olingo.client.api.communication.request.ODataStreamer; import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator; import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.http.NoContentException; @@ -257,10 +257,10 @@ public abstract class AbstractODataResponse implements ODataResponse { this.headers.putAll(partHeaders); final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - LOG.debug("Retrieved payload {}", bos.toString(Charset.forName("UTF-8").toString())); while (batchLineIterator.hasNext()) { - bos.write(batchLineIterator.nextLine().getBytes()); + bos.write(batchLineIterator.nextLine().getBytes(Constants.UTF8)); + bos.write(ODataStreamer.CRLF); } try { http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java ---------------------------------------------------------------------- diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java index cf7da13..a800037 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java @@ -18,9 +18,13 @@ */ package org.apache.olingo.client.core.communication.response.v4; +import java.util.Collection; +import java.util.Map; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; +import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator; import org.apache.olingo.client.api.communication.response.v4.AsyncResponse; +import org.apache.olingo.client.core.communication.request.batch.ODataBatchController; import org.apache.olingo.client.core.communication.response.AbstractODataResponse; /** @@ -46,4 +50,27 @@ public class AsyncResponseImpl extends AbstractODataResponse implements AsyncRes public AsyncResponseImpl(final HttpClient client, final HttpResponse res) { super(client, res); } + + /** + * Constructor to be used inside a batch item. + */ + public AsyncResponseImpl( + final Map.Entry<Integer, String> responseLine, + final Map<String, Collection<String>> headers, + final ODataBatchLineIterator batchLineIterator, + final String boundary) { + super(); + + if (hasBeenInitialized) { + throw new IllegalStateException("Request already initialized"); + } + + this.hasBeenInitialized = true; + + this.batchInfo = new ODataBatchController(batchLineIterator, boundary); + + this.statusCode = responseLine.getKey(); + this.statusMessage = responseLine.getValue(); + this.headers.putAll(headers); + } } http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java ---------------------------------------------------------------------- diff --git a/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java b/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java index 8a798dd..6c24789 100644 --- a/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java +++ b/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java @@ -26,12 +26,14 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; import java.util.Calendar; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.http.HttpResponse; import org.apache.olingo.client.api.ODataBatchConstants; +import org.apache.olingo.client.api.communication.header.HeaderName; import org.apache.olingo.client.api.communication.request.ODataStreamManager; import org.apache.olingo.client.api.communication.request.batch.ODataBatchResponseItem; import org.apache.olingo.client.api.communication.request.batch.ODataChangeset; @@ -44,10 +46,13 @@ import org.apache.olingo.client.api.communication.request.cud.ODataEntityUpdateR import org.apache.olingo.client.api.communication.request.cud.v4.UpdateType; import org.apache.olingo.client.api.communication.request.retrieve.ODataEntityRequest; import org.apache.olingo.client.api.communication.request.retrieve.ODataEntitySetRequest; +import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper; import org.apache.olingo.client.api.communication.response.ODataBatchResponse; import org.apache.olingo.client.api.communication.response.ODataEntityCreateResponse; import org.apache.olingo.client.api.communication.response.ODataEntityUpdateResponse; import org.apache.olingo.client.api.communication.response.ODataResponse; +import org.apache.olingo.client.api.communication.response.v4.AsyncResponse; +import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper; import org.apache.olingo.client.api.uri.v4.URIBuilder; import org.apache.olingo.client.core.communication.request.AbstractODataStreamManager; import org.apache.olingo.client.core.communication.request.Wrapper; @@ -397,7 +402,7 @@ public class BatchTestITCase extends AbstractTestITCase { } @Test - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked"}) public void batchRequest() throws EdmPrimitiveTypeException { // create your request final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(testStaticServiceRootURL); @@ -412,8 +417,7 @@ public class BatchTestITCase extends AbstractTestITCase { // prepare URI URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL); - targetURI.appendEntitySetSegment("Customers").appendKeySegment(1);//. -// expand("Orders").select("PersonID,Orders/OrderID"); + targetURI.appendEntitySetSegment("Customers").appendKeySegment(1); // create new request ODataEntityRequest<ODataEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build()); @@ -520,9 +524,79 @@ public class BatchTestITCase extends AbstractTestITCase { entres = (ODataEntityRequestImpl.ODataEntityResponseImpl) res; entity = entres.getBody(); - assertEquals("new last name", - entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class)); + assertEquals("new last name", entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class)); + + assertFalse(iter.hasNext()); + } + + @Test + public void async() { + // create your request + final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest( + URI.create(testStaticServiceRootURL + "/async/").normalize().toASCIIString()); + request.setAccept(ACCEPT); + + final AsyncBatchRequestWrapper async = client.getAsyncRequestFactory().getAsyncBatchRequestWrapper(request); + + // ------------------------------------------- + // Add retrieve item + // ------------------------------------------- + ODataRetrieve retrieve = async.addRetrieve(); + // prepare URI + URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL); + targetURI.appendEntitySetSegment("People").appendKeySegment(5); + + // create new request + ODataEntityRequest<ODataEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build()); + queryReq.setFormat(ODataPubFormat.JSON); + + retrieve.setRequest(queryReq); + // ------------------------------------------- + + // ------------------------------------------- + // Add retrieve item + // ------------------------------------------- + retrieve = async.addRetrieve(); + + // prepare URI + targetURI = client.getURIBuilder(testStaticServiceRootURL).appendEntitySetSegment("Customers").appendKeySegment(1); + + // create new request + queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build()); + + retrieve.setRequest(queryReq); + // ------------------------------------------- + + final AsyncResponseWrapper<ODataBatchResponse> responseWrapper = async.execute(); + + assertTrue(responseWrapper.isPreferenceApplied()); + assertTrue(responseWrapper.isDone()); + + final ODataBatchResponse response = responseWrapper.getODataResponse(); + + assertEquals(200, response.getStatusCode()); + assertEquals("Ok", response.getStatusMessage()); + final Iterator<ODataBatchResponseItem> iter = response.getBody(); + + // retrieve the first item (ODataRetrieve) + ODataBatchResponseItem item = iter.next(); + assertTrue(item instanceof ODataRetrieveResponseItem); + + // The service return interim results to an asynchronously executing batch. + ODataRetrieveResponseItem retitem = (ODataRetrieveResponseItem) item; + ODataResponse res = retitem.next(); + assertTrue(res instanceof AsyncResponse); + assertEquals(202, res.getStatusCode()); + assertEquals("Accepted", res.getStatusMessage()); + + Collection<String> newMonitorLocation = res.getHeader(HeaderName.location); + if (newMonitorLocation != null && !newMonitorLocation.isEmpty()) { + responseWrapper.forceNextMonitorCheck(URI.create(newMonitorLocation.iterator().next())); + // .... now you can start again with isDone() and getODataResponse(). + } + + assertFalse(retitem.hasNext()); assertFalse(iter.hasNext()); }
