This is an automated email from the ASF dual-hosted git repository.

mibo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/olingo-odata4.git


The following commit(s) were added to refs/heads/master by this push:
     new b8ac17e  [OLINGO-1343] Fix deadlock Piped_Stream (by Aleksandr)
b8ac17e is described below

commit b8ac17e4ae0d400734f1f531308c7beff297d55e
Author: mibo <[email protected]>
AuthorDate: Fri Apr 26 20:48:08 2019 +0200

    [OLINGO-1343] Fix deadlock Piped_Stream (by Aleksandr)
---
 .../olingo/client/core/ConfigurationImpl.java      |   2 +
 .../request/AbstractODataStreamManager.java        |  12 +-
 .../request/AbstractODataStreamer.java             |   2 +-
 .../request/batch/AbstractODataBatchRequest.java   |   4 +-
 .../batch/AbstractODataBatchRequestItem.java       |   3 +-
 .../response/AbstractODataResponse.java            |  12 +-
 .../core/communication/util/PipedInputStream.java  | 219 +++++++++++++++++++++
 .../core/communication/util/PipedOutputStream.java | 182 +++++++++++++++++
 8 files changed, 421 insertions(+), 15 deletions(-)

diff --git 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/ConfigurationImpl.java
 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/ConfigurationImpl.java
index 9f88bdb..2a49935 100644
--- 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/ConfigurationImpl.java
+++ 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/ConfigurationImpl.java
@@ -60,6 +60,8 @@ public class ConfigurationImpl implements Configuration {
 
   private static final String CONTINUE_ON_ERROR = "continueOnError";
 
+  public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;  // 4MB
+
   private final Map<String, Object> CONF = new HashMap<String, Object>();
 
   private transient ExecutorService executor = createExecutor(10);
diff --git 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamManager.java
 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamManager.java
index 6dcd858..8495f61 100644
--- 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamManager.java
+++ 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamManager.java
@@ -18,10 +18,7 @@
  */
 package org.apache.olingo.client.core.communication.request;
 
-import java.io.IOException;
 import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -32,6 +29,9 @@ import org.apache.http.HttpResponse;
 import org.apache.olingo.client.api.communication.request.ODataPayloadManager;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
 import org.apache.olingo.client.api.http.HttpClientException;
+import org.apache.olingo.client.core.ConfigurationImpl;
+import org.apache.olingo.client.core.communication.util.PipedInputStream;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
 
 /**
  * OData request payload management abstract class.
@@ -62,7 +62,7 @@ public abstract class AbstractODataStreamManager<T extends 
ODataResponse> extend
    * @param futureWrap wrapper of the Future object of the HttpResponse.
    */
   public AbstractODataStreamManager(final Wrapper<Future<HttpResponse>> 
futureWrap) {
-    this(futureWrap, new PipedOutputStream());
+    this(futureWrap, new PipedOutputStream(null, 
ConfigurationImpl.DEFAULT_BUFFER_SIZE));
   }
 
   /**
@@ -76,8 +76,8 @@ public abstract class AbstractODataStreamManager<T extends 
ODataResponse> extend
 
     this.futureWrap = futureWrap;
     try {
-      this.body = new PipedInputStream(getBodyStreamWriter());
-    } catch (IOException e) {
+      this.body = new PipedInputStream(getBodyStreamWriter(), 
ConfigurationImpl.DEFAULT_BUFFER_SIZE);
+    } catch (Exception e) {
       throw new IllegalStateException(e);
     }
     this.defaultBody = this.body;
diff --git 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamer.java
 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamer.java
index c470916..f98e16e 100644
--- 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamer.java
+++ 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/AbstractODataStreamer.java
@@ -20,10 +20,10 @@ package org.apache.olingo.client.core.communication.request;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.PipedOutputStream;
 import java.util.Arrays;
 
 import org.apache.olingo.client.api.communication.request.ODataStreamer;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequest.java
 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequest.java
index d99cf9c..8655ccc 100644
--- 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequest.java
+++ 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.olingo.client.core.communication.request.batch;
 
-import java.io.PipedOutputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
@@ -30,6 +29,7 @@ import 
org.apache.olingo.client.api.communication.request.ODataPayloadManager;
 import 
org.apache.olingo.client.api.communication.request.batch.ODataBatchRequest;
 import 
org.apache.olingo.client.api.communication.request.batch.ODataBatchResponseItem;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
 import 
org.apache.olingo.client.core.communication.request.streamed.AbstractODataStreamedRequest;
 import org.apache.olingo.commons.api.format.ContentType;
 import org.apache.olingo.commons.api.http.HttpMethod;
@@ -71,7 +71,7 @@ public abstract class AbstractODataBatchRequest<V extends 
ODataResponse, T exten
   }
 
   public PipedOutputStream getOutputStream() {
-    return getPayloadManager().getBodyStreamWriter();
+    return (PipedOutputStream) getPayloadManager().getBodyStreamWriter();
   }
 
   /**
diff --git 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequestItem.java
 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequestItem.java
index eccd447..3b9972c 100644
--- 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequestItem.java
+++ 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/AbstractODataBatchRequestItem.java
@@ -24,6 +24,7 @@ import org.apache.olingo.client.api.ODataBatchConstants;
 import 
org.apache.olingo.client.api.communication.request.ODataBatchableRequest;
 import 
org.apache.olingo.client.api.communication.request.batch.ODataBatchRequest;
 import 
org.apache.olingo.client.api.communication.request.batch.ODataBatchRequestItem;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
 import 
org.apache.olingo.client.core.communication.request.AbstractODataStreamer;
 
 /**
@@ -54,7 +55,7 @@ public abstract class AbstractODataBatchRequestItem extends 
AbstractODataStreame
    * @param req OData batch request.
    */
   public AbstractODataBatchRequestItem(final ODataBatchRequest req) {
-    super(req.getOutputStream());
+    super((PipedOutputStream) req.getOutputStream());
     this.open = true;
     this.req = req;
   }
diff --git 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
index 8be5d35..bd2c481 100644
--- 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
+++ 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
@@ -22,8 +22,6 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
@@ -40,6 +38,9 @@ import 
org.apache.olingo.client.api.communication.request.ODataStreamer;
 import 
org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
 import org.apache.olingo.client.api.http.NoContentException;
+import org.apache.olingo.client.core.ConfigurationImpl;
+import org.apache.olingo.client.core.communication.util.PipedInputStream;
+import org.apache.olingo.client.core.communication.util.PipedOutputStream;
 import 
org.apache.olingo.client.core.communication.request.batch.ODataBatchController;
 import 
org.apache.olingo.client.core.communication.request.batch.ODataBatchLineIteratorImpl;
 import 
org.apache.olingo.client.core.communication.request.batch.ODataBatchUtilities;
@@ -264,10 +265,11 @@ public abstract class AbstractODataResponse implements 
ODataResponse {
 
     if (payload == null && batchInfo != null && batchInfo.isValidBatch()) {
       // get input stream till the end of item
-      payload = new PipedInputStream();
+      payload = new PipedInputStream(null);
 
       try {
-        final PipedOutputStream os = new PipedOutputStream((PipedInputStream) 
payload);
+        final PipedOutputStream os = new PipedOutputStream((PipedInputStream) 
payload,
+                ConfigurationImpl.DEFAULT_BUFFER_SIZE);
 
         new Thread(new Runnable() {
           @Override
@@ -281,7 +283,7 @@ public abstract class AbstractODataResponse implements 
ODataResponse {
             }
           }
         }).start();
-      } catch (IOException e) {
+      } catch (Exception e) {
         LOG.error("Error streaming payload response", e);
         throw new IllegalStateException(e);
       }
diff --git 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedInputStream.java
 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedInputStream.java
new file mode 100644
index 0000000..b7ec435
--- /dev/null
+++ 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedInputStream.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.client.core.communication.util;
+
+import org.apache.olingo.client.core.ConfigurationImpl;
+
+import java.io.IOException;
+
+/**
+ * This class is equivalent to <code>java.io.PipedInputStream</code>. In the
+ * interface it only adds a constructor which allows for specifying the buffer
+ * size. Its implementation, however, is much simpler and a lot more efficient
+ * than its equivalent. It doesn't rely on polling. Instead it uses proper
+ * synchronization with its counterpart 
<code>be.re.io.PipedOutputStream</code>.
+ *
+ * Multiple readers can read from this stream concurrently. The block asked for
+ * by a reader is delivered completely, or until the end of the stream if less
+ * is available. Other readers can't come in between.
+ * 
+ * @author WD
+ */
+public class PipedInputStream extends java.io.PipedInputStream {
+
+  final Object sync = new Object();
+  byte[] buffer;
+  boolean closed = false;
+  int readLaps = 0;
+  int readPosition = 0;
+  PipedOutputStream source;
+  int writeLaps = 0;
+  int writePosition = 0;
+
+  /**
+   * Creates an unconnected PipedInputStream with a default buffer size.
+   */
+  protected PipedInputStream() {
+    this(null);
+  }
+
+  /**
+   * Creates a PipedInputStream with a default buffer size and connects it to
+   * <code>source</code>.
+   * 
+   * @exception IOException
+   *   It was already connected.
+   */
+  public PipedInputStream(PipedOutputStream source) {
+    this(source, ConfigurationImpl.DEFAULT_BUFFER_SIZE);
+  }
+
+  /**
+   * Creates a PipedInputStream with buffer size <code>bufferSize</code> and
+   * connects it to <code>source</code>.
+   * 
+   */
+  public PipedInputStream(PipedOutputStream source, int bufferSize) {
+    if (source != null) {
+      try {
+        connect(source);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    buffer = new byte[bufferSize];
+  }
+
+  public int available() {
+    /*
+     * The circular buffer is inspected to see where the reader and the
+     * writer are located.
+     */
+    return writePosition > readPosition
+        /* The writer is in the same lap. */ ? writePosition - readPosition
+        : (writePosition < readPosition
+            /* The writer is in the next lap. */ ? buffer.length
+                - readPosition
+                + 1
+                + writePosition
+            :
+            /*
+             * The writer is at the same position or a complete lap
+             * ahead.
+             */
+            (writeLaps > readLaps ? buffer.length : 0));
+  }
+
+  /**
+   * @exception IOException
+   *   The pipe is not connected.
+   */
+  public void close() throws IOException {
+    if (source == null) {
+      throw new IOException("Unconnected pipe");
+    }
+
+    synchronized (sync) {
+      closed = true;
+      // Release any pending writers.
+      sync.notifyAll();
+    }
+  }
+
+  /**
+   * @exception IOException
+   *   The pipe is already connected.
+   */
+  public void connect(PipedOutputStream source) throws IOException {
+    if (this.source != null) {
+      throw new IOException("Pipe already connected");
+    }
+
+    this.source = source;
+    source.sink = this;
+  }
+
+  public void mark(int readLimit) {
+    /* not supported */
+  }
+
+  public boolean markSupported() {
+    return false;
+  }
+
+  public int read() throws IOException {
+    byte[] b = new byte[0];
+    int result = read(b);
+
+    return result == -1 ? -1 : b[0];
+  }
+
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  /**
+   * @exception IOException
+   *   The pipe is not connected.
+   */
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (source == null) {
+      throw new IOException("Unconnected pipe");
+    }
+
+    synchronized (sync) {
+      if (writePosition == readPosition && writeLaps == readLaps) {
+        if (closed) {
+          return -1;
+        }
+
+        // Wait for any writer to put something in the circular buffer.
+
+        try {
+          sync.wait();
+        } catch (InterruptedException e) {
+          throw new IOException(e.getMessage());
+        }
+
+        // Try again.
+
+        return read(b, off, len);
+      }
+
+      // Don't read more than the capacity indicated by len or what's
+      // available
+      // in the circular buffer.
+      int amount = Math .min(len,
+                  (writePosition > readPosition
+                      ? writePosition
+                      : buffer.length)
+                      -
+                      readPosition);
+
+      System.arraycopy(
+              buffer,
+              readPosition,
+              b,
+              off,
+              amount);
+      readPosition += amount;
+
+      if (readPosition == buffer.length) {
+        // A lap was completed, so go // back.
+        readPosition = 0;
+        readLaps++;
+      }
+
+      // The buffer is only released when the complete desired block was
+      // obtained.
+
+      if (amount < len) {
+        int second = read(b, off + amount, len - amount);
+
+        return second == -1 ? amount : amount + second;
+      } else {
+        sync.notifyAll();
+      }
+
+      return amount;
+    }
+  }
+
+}
diff --git 
a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedOutputStream.java
 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedOutputStream.java
new file mode 100644
index 0000000..1d03196
--- /dev/null
+++ 
b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/util/PipedOutputStream.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.client.core.communication.util;
+
+import org.apache.olingo.client.core.ConfigurationImpl;
+
+import java.io.IOException;
+
+/**
+ * This class is equivalent to <code>java.io.PipedOutputStream</code>. In the
+ * interface it only adds a constructor which allows for specifying the buffer
+ * size. Its implementation, however, is much simpler and a lot more efficient
+ * than its equivalent. It doesn't rely on polling. Instead it uses proper
+ * synchronization with its counterpart <code>be.re.io.PipedInputStream</code>.
+ *
+ * Multiple writers can write in this stream concurrently. The block written by
+ * a writer is put in completely. Other writers can't come in between.
+ * 
+ * @author WD
+ */
+public class PipedOutputStream extends java.io.PipedOutputStream {
+
+  PipedInputStream sink;
+
+  /**
+   * Creates an unconnected PipedOutputStream.
+   */
+
+  protected PipedOutputStream() {
+    this(null);
+  }
+
+  /**
+   * Creates a PipedOutputStream with a default buffer size and connects it to
+   * <code>sink</code>.
+   * 
+   */
+  public PipedOutputStream(PipedInputStream sink) {
+    this(sink, ConfigurationImpl.DEFAULT_BUFFER_SIZE);
+  }
+
+  /**
+   * Creates a PipedOutputStream with buffer size <code>bufferSize</code> and
+   * connects it to <code>sink</code>.
+   * 
+   */
+  public PipedOutputStream(PipedInputStream sink, int bufferSize) {
+    if (sink != null) {
+      try {
+        connect(sink);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      sink.buffer = new byte[bufferSize];
+    }
+  }
+
+  /**
+   * @exception IOException
+   * The pipe is not connected.
+   */
+  public void close() throws IOException {
+    if (sink == null) {
+      throw new IOException("Unconnected pipe");
+    }
+
+    synchronized (sink.sync) {
+      sink.closed = true;
+      flush();
+    }
+  }
+
+  /**
+   * @exception IOException
+   * The pipe is already connected.
+   */
+
+  public void connect(PipedInputStream sink) throws IOException {
+    if (this.sink != null) {
+      throw new IOException("Pipe already connected");
+    }
+
+    this.sink = sink;
+    sink.source = this;
+  }
+
+  public void flush() throws IOException {
+    synchronized (sink.sync) {
+      // Release all readers.
+      sink.sync.notifyAll();
+    }
+  }
+
+  public void write(int b) throws IOException {
+    write(new byte[] { (byte) b });
+  }
+
+  public void write(byte[] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  /**
+   * @exception IOException
+   * The pipe is not connected or a reader has closed it.
+   */
+
+  public void write(byte[] b, int off, int len) throws IOException {
+    if (sink == null) {
+      throw new IOException("Unconnected pipe");
+    }
+
+    if (sink.closed) {
+      throw new IOException("Broken pipe");
+    }
+
+    synchronized (sink.sync) {
+      if (sink.writePosition == sink.readPosition
+          && sink.writeLaps > sink.readLaps) {
+        // The circular buffer is full, so wait for some reader to
+        // consume something.
+        try {
+          sink.sync.wait();
+        } catch (InterruptedException e) {
+          throw new IOException(e.getMessage());
+        }
+
+        // Try again.
+
+        write(b, off, len);
+
+        return;
+      }
+
+      // Don't write more than the capacity indicated by len or the space
+      // available in the circular buffer.
+
+      int amount = Math.min(len,
+                  (sink.writePosition < sink.readPosition
+                      ? sink.readPosition
+                      : sink.buffer.length)
+                      - sink.writePosition);
+
+      System.arraycopy(
+              b,
+              off,
+              sink.buffer,
+              sink.writePosition,
+              amount);
+      sink.writePosition += amount;
+
+      if (sink.writePosition == sink.buffer.length) {
+        sink.writePosition = 0;
+        ++sink.writeLaps;
+      }
+
+      // The buffer is only released when the complete desired block was
+      // written.
+      if (amount < len) {
+        write(b, off + amount, len - amount);
+      } else {
+        sink.sync.notifyAll();
+      }
+    }
+  }
+
+}
\ No newline at end of file

Reply via email to