This is an automated email from the ASF dual-hosted git repository.
mibo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/olingo-odata4.git
The following commit(s) were added to refs/heads/master by this push:
new b8ac17e [OLINGO-1343] Fix deadlock Piped_Stream (by Aleksandr)
b8ac17e is described below
commit b8ac17e4ae0d400734f1f531308c7beff297d55e
Author: mibo <[email protected]>
AuthorDate: Fri Apr 26 20:48:08 2019 +0200
[OLINGO-1343] Fix deadlock Piped_Stream (by Aleksandr)
---
.../olingo/client/core/ConfigurationImpl.java | 2 +
.../request/AbstractODataStreamManager.java | 12 +-
.../request/AbstractODataStreamer.java | 2 +-
.../request/batch/AbstractODataBatchRequest.java | 4 +-
.../batch/AbstractODataBatchRequestItem.java | 3 +-
.../response/AbstractODataResponse.java | 12 +-
.../core/communication/util/PipedInputStream.java | 219 +++++++++++++++++++++
.../core/communication/util/PipedOutputStream.java | 182 +++++++++++++++++
8 files changed, 421 insertions(+), 15 deletions(-)
diff --git
a/lib/client-core/src/main/java/org/apache/olingo/client/core/ConfigurationImpl.java
b/lib/client-core/src/main/java/org/apache/olingo/client/core/ConfigurationImpl.java
index 9f88bdb..2a49935 100644
---
a/lib/client-core/src/main/java/org/apache/olingo/client/core/ConfigurationImpl.java
+++
b/lib/client-core/src/main/java/org/apache/olingo/client/core/ConfigurationImpl.java
@@ -60,6 +60,8 @@ public class ConfigurationImpl implements Configuration {
private static final String CONTINUE_ON_ERROR = "continueOnError";
+ public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB
+
private final Map<String, Object> CONF = new HashMap<String, Object>();
private transient ExecutorService executor = createExecutor(10);
diff --git
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamManager.java
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamManager.java
index 6dcd858..8495f61 100644
---
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamManager.java
+++
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamManager.java
@@ -18,10 +18,7 @@
*/
package org.apache.olingo.client.core.communication.request;
-import java.io.IOException;
import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -32,6 +29,9 @@ import org.apache.http.HttpResponse;
import org.apache.olingo.client.api.communication.request.ODataPayloadManager;
import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.api.http.HttpClientException;
+import org.apache.olingo.client.core.ConfigurationImpl;
+import org.apache.olingo.client.core.communication.util.PipedInputStream;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
/**
* OData request payload management abstract class.
@@ -62,7 +62,7 @@ public abstract class AbstractODataStreamManager<T extends
ODataResponse> extend
* @param futureWrap wrapper of the Future object of the HttpResponse.
*/
public AbstractODataStreamManager(final Wrapper<Future<HttpResponse>>
futureWrap) {
- this(futureWrap, new PipedOutputStream());
+ this(futureWrap, new PipedOutputStream(null,
ConfigurationImpl.DEFAULT_BUFFER_SIZE));
}
/**
@@ -76,8 +76,8 @@ public abstract class AbstractODataStreamManager<T extends
ODataResponse> extend
this.futureWrap = futureWrap;
try {
- this.body = new PipedInputStream(getBodyStreamWriter());
- } catch (IOException e) {
+ this.body = new PipedInputStream(getBodyStreamWriter(),
ConfigurationImpl.DEFAULT_BUFFER_SIZE);
+ } catch (Exception e) {
throw new IllegalStateException(e);
}
this.defaultBody = this.body;
diff --git
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamer.java
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamer.java
index c470916..f98e16e 100644
---
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamer.java
+++
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamer.java
@@ -20,10 +20,10 @@ package org.apache.olingo.client.core.communication.request;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.PipedOutputStream;
import java.util.Arrays;
import org.apache.olingo.client.api.communication.request.ODataStreamer;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequest.java
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequest.java
index d99cf9c..8655ccc 100644
---
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequest.java
+++
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequest.java
@@ -18,7 +18,6 @@
*/
package org.apache.olingo.client.core.communication.request.batch;
-import java.io.PipedOutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -30,6 +29,7 @@ import
org.apache.olingo.client.api.communication.request.ODataPayloadManager;
import
org.apache.olingo.client.api.communication.request.batch.ODataBatchRequest;
import
org.apache.olingo.client.api.communication.request.batch.ODataBatchResponseItem;
import org.apache.olingo.client.api.communication.response.ODataResponse;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
import
org.apache.olingo.client.core.communication.request.streamed.AbstractODataStreamedRequest;
import org.apache.olingo.commons.api.format.ContentType;
import org.apache.olingo.commons.api.http.HttpMethod;
@@ -71,7 +71,7 @@ public abstract class AbstractODataBatchRequest<V extends
ODataResponse, T exten
}
public PipedOutputStream getOutputStream() {
- return getPayloadManager().getBodyStreamWriter();
+ return (PipedOutputStream) getPayloadManager().getBodyStreamWriter();
}
/**
diff --git
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequestItem.java
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequestItem.java
index eccd447..3b9972c 100644
---
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequestItem.java
+++
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequestItem.java
@@ -24,6 +24,7 @@ import org.apache.olingo.client.api.ODataBatchConstants;
import
org.apache.olingo.client.api.communication.request.ODataBatchableRequest;
import
org.apache.olingo.client.api.communication.request.batch.ODataBatchRequest;
import
org.apache.olingo.client.api.communication.request.batch.ODataBatchRequestItem;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
import
org.apache.olingo.client.core.communication.request.AbstractODataStreamer;
/**
@@ -54,7 +55,7 @@ public abstract class AbstractODataBatchRequestItem extends
AbstractODataStreame
* @param req OData batch request.
*/
public AbstractODataBatchRequestItem(final ODataBatchRequest req) {
- super(req.getOutputStream());
+ super((PipedOutputStream) req.getOutputStream());
this.open = true;
this.req = req;
}
diff --git
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
index 8be5d35..bd2c481 100644
---
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
+++
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
@@ -22,8 +22,6 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@@ -40,6 +38,9 @@ import
org.apache.olingo.client.api.communication.request.ODataStreamer;
import
org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator;
import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.api.http.NoContentException;
+import org.apache.olingo.client.core.ConfigurationImpl;
+import org.apache.olingo.client.core.communication.util.PipedInputStream;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
import
org.apache.olingo.client.core.communication.request.batch.ODataBatchController;
import
org.apache.olingo.client.core.communication.request.batch.ODataBatchLineIteratorImpl;
import
org.apache.olingo.client.core.communication.request.batch.ODataBatchUtilities;
@@ -264,10 +265,11 @@ public abstract class AbstractODataResponse implements
ODataResponse {
if (payload == null && batchInfo != null && batchInfo.isValidBatch()) {
// get input stream till the end of item
- payload = new PipedInputStream();
+ payload = new PipedInputStream(null);
try {
- final PipedOutputStream os = new PipedOutputStream((PipedInputStream)
payload);
+ final PipedOutputStream os = new PipedOutputStream((PipedInputStream)
payload,
+ ConfigurationImpl.DEFAULT_BUFFER_SIZE);
new Thread(new Runnable() {
@Override
@@ -281,7 +283,7 @@ public abstract class AbstractODataResponse implements
ODataResponse {
}
}
}).start();
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Error streaming payload response", e);
throw new IllegalStateException(e);
}
diff --git
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedInputStream.java
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedInputStream.java
new file mode 100644
index 0000000..b7ec435
--- /dev/null
+++
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedInputStream.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.client.core.communication.util;
+
+import org.apache.olingo.client.core.ConfigurationImpl;
+
+import java.io.IOException;
+
+/**
+ * This class is equivalent to <code>java.io.PipedInputStream</code>. In the
+ * interface it only adds a constructor which allows for specifying the buffer
+ * size. Its implementation, however, is much simpler and a lot more efficient
+ * than its equivalent. It doesn't rely on polling. Instead it uses proper
+ * synchronization with its counterpart
<code>be.re.io.PipedOutputStream</code>.
+ *
+ * Multiple readers can read from this stream concurrently. The block asked for
+ * by a reader is delivered completely, or until the end of the stream if less
+ * is available. Other readers can't come in between.
+ *
+ * @author WD
+ */
+public class PipedInputStream extends java.io.PipedInputStream {
+
+ final Object sync = new Object();
+ byte[] buffer;
+ boolean closed = false;
+ int readLaps = 0;
+ int readPosition = 0;
+ PipedOutputStream source;
+ int writeLaps = 0;
+ int writePosition = 0;
+
+ /**
+ * Creates an unconnected PipedInputStream with a default buffer size.
+ */
+ protected PipedInputStream() {
+ this(null);
+ }
+
+ /**
+ * Creates a PipedInputStream with a default buffer size and connects it to
+ * <code>source</code>.
+ *
+ * @exception IOException
+ * It was already connected.
+ */
+ public PipedInputStream(PipedOutputStream source) {
+ this(source, ConfigurationImpl.DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a PipedInputStream with buffer size <code>bufferSize</code> and
+ * connects it to <code>source</code>.
+ *
+ */
+ public PipedInputStream(PipedOutputStream source, int bufferSize) {
+ if (source != null) {
+ try {
+ connect(source);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ buffer = new byte[bufferSize];
+ }
+
+ public int available() {
+ /*
+ * The circular buffer is inspected to see where the reader and the
+ * writer are located.
+ */
+ return writePosition > readPosition
+ /* The writer is in the same lap. */ ? writePosition - readPosition
+ : (writePosition < readPosition
+ /* The writer is in the next lap. */ ? buffer.length
+ - readPosition
+ + 1
+ + writePosition
+ :
+ /*
+ * The writer is at the same position or a complete lap
+ * ahead.
+ */
+ (writeLaps > readLaps ? buffer.length : 0));
+ }
+
+ /**
+ * @exception IOException
+ * The pipe is not connected.
+ */
+ public void close() throws IOException {
+ if (source == null) {
+ throw new IOException("Unconnected pipe");
+ }
+
+ synchronized (sync) {
+ closed = true;
+ // Release any pending writers.
+ sync.notifyAll();
+ }
+ }
+
+ /**
+ * @exception IOException
+ * The pipe is already connected.
+ */
+ public void connect(PipedOutputStream source) throws IOException {
+ if (this.source != null) {
+ throw new IOException("Pipe already connected");
+ }
+
+ this.source = source;
+ source.sink = this;
+ }
+
+ public void mark(int readLimit) {
+ /* not supported */
+ }
+
+ public boolean markSupported() {
+ return false;
+ }
+
+ public int read() throws IOException {
+ byte[] b = new byte[0];
+ int result = read(b);
+
+ return result == -1 ? -1 : b[0];
+ }
+
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ /**
+ * @exception IOException
+ * The pipe is not connected.
+ */
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (source == null) {
+ throw new IOException("Unconnected pipe");
+ }
+
+ synchronized (sync) {
+ if (writePosition == readPosition && writeLaps == readLaps) {
+ if (closed) {
+ return -1;
+ }
+
+ // Wait for any writer to put something in the circular buffer.
+
+ try {
+ sync.wait();
+ } catch (InterruptedException e) {
+ throw new IOException(e.getMessage());
+ }
+
+ // Try again.
+
+ return read(b, off, len);
+ }
+
+ // Don't read more than the capacity indicated by len or what's
+ // available
+ // in the circular buffer.
+ int amount = Math .min(len,
+ (writePosition > readPosition
+ ? writePosition
+ : buffer.length)
+ -
+ readPosition);
+
+ System.arraycopy(
+ buffer,
+ readPosition,
+ b,
+ off,
+ amount);
+ readPosition += amount;
+
+ if (readPosition == buffer.length) {
+ // A lap was completed, so go // back.
+ readPosition = 0;
+ readLaps++;
+ }
+
+ // The buffer is only released when the complete desired block was
+ // obtained.
+
+ if (amount < len) {
+ int second = read(b, off + amount, len - amount);
+
+ return second == -1 ? amount : amount + second;
+ } else {
+ sync.notifyAll();
+ }
+
+ return amount;
+ }
+ }
+
+}
diff --git
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedOutputStream.java
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedOutputStream.java
new file mode 100644
index 0000000..1d03196
--- /dev/null
+++
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedOutputStream.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.client.core.communication.util;
+
+import org.apache.olingo.client.core.ConfigurationImpl;
+
+import java.io.IOException;
+
+/**
+ * This class is equivalent to <code>java.io.PipedOutputStream</code>. In the
+ * interface it only adds a constructor which allows for specifying the buffer
+ * size. Its implementation, however, is much simpler and a lot more efficient
+ * than its equivalent. It doesn't rely on polling. Instead it uses proper
+ * synchronization with its counterpart <code>be.re.io.PipedInputStream</code>.
+ *
+ * Multiple writers can write in this stream concurrently. The block written by
+ * a writer is put in completely. Other writers can't come in between.
+ *
+ * @author WD
+ */
+public class PipedOutputStream extends java.io.PipedOutputStream {
+
+ PipedInputStream sink;
+
+ /**
+ * Creates an unconnected PipedOutputStream.
+ */
+
+ protected PipedOutputStream() {
+ this(null);
+ }
+
+ /**
+ * Creates a PipedOutputStream with a default buffer size and connects it to
+ * <code>sink</code>.
+ *
+ */
+ public PipedOutputStream(PipedInputStream sink) {
+ this(sink, ConfigurationImpl.DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a PipedOutputStream with buffer size <code>bufferSize</code> and
+ * connects it to <code>sink</code>.
+ *
+ */
+ public PipedOutputStream(PipedInputStream sink, int bufferSize) {
+ if (sink != null) {
+ try {
+ connect(sink);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ sink.buffer = new byte[bufferSize];
+ }
+ }
+
+ /**
+ * @exception IOException
+ * The pipe is not connected.
+ */
+ public void close() throws IOException {
+ if (sink == null) {
+ throw new IOException("Unconnected pipe");
+ }
+
+ synchronized (sink.sync) {
+ sink.closed = true;
+ flush();
+ }
+ }
+
+ /**
+ * @exception IOException
+ * The pipe is already connected.
+ */
+
+ public void connect(PipedInputStream sink) throws IOException {
+ if (this.sink != null) {
+ throw new IOException("Pipe already connected");
+ }
+
+ this.sink = sink;
+ sink.source = this;
+ }
+
+ public void flush() throws IOException {
+ synchronized (sink.sync) {
+ // Release all readers.
+ sink.sync.notifyAll();
+ }
+ }
+
+ public void write(int b) throws IOException {
+ write(new byte[] { (byte) b });
+ }
+
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ /**
+ * @exception IOException
+ * The pipe is not connected or a reader has closed it.
+ */
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (sink == null) {
+ throw new IOException("Unconnected pipe");
+ }
+
+ if (sink.closed) {
+ throw new IOException("Broken pipe");
+ }
+
+ synchronized (sink.sync) {
+ if (sink.writePosition == sink.readPosition
+ && sink.writeLaps > sink.readLaps) {
+ // The circular buffer is full, so wait for some reader to
+ // consume something.
+ try {
+ sink.sync.wait();
+ } catch (InterruptedException e) {
+ throw new IOException(e.getMessage());
+ }
+
+ // Try again.
+
+ write(b, off, len);
+
+ return;
+ }
+
+ // Don't write more than the capacity indicated by len or the space
+ // available in the circular buffer.
+
+ int amount = Math.min(len,
+ (sink.writePosition < sink.readPosition
+ ? sink.readPosition
+ : sink.buffer.length)
+ - sink.writePosition);
+
+ System.arraycopy(
+ b,
+ off,
+ sink.buffer,
+ sink.writePosition,
+ amount);
+ sink.writePosition += amount;
+
+ if (sink.writePosition == sink.buffer.length) {
+ sink.writePosition = 0;
+ ++sink.writeLaps;
+ }
+
+ // The buffer is only released when the complete desired block was
+ // written.
+ if (amount < len) {
+ write(b, off + amount, len - amount);
+ } else {
+ sink.sync.notifyAll();
+ }
+ }
+ }
+
+}
\ No newline at end of file