Author: ravi
Date: Sun Aug 28 20:09:19 2016
New Revision: 1758147
URL: http://svn.apache.org/viewvc?rev=1758147&view=rev
Log:
Fix by Senduran for SYNAPSE-1015
Added:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java?rev=1758147&r1=1758146&r2=1758147&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
Sun Aug 28 20:09:19 2016
@@ -23,10 +23,9 @@ import org.apache.http.nio.IOControl;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.ContentEncoder;
import org.apache.synapse.transport.passthru.config.BaseConfiguration;
+import org.apache.synapse.transport.passthru.util.ControlledByteBuffer;
import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -43,9 +42,9 @@ public class Pipe {
private IOControl consumerIoControl;
/** Fixed size buffer to read and write data */
- private ByteBuffer buffer;
+ private ControlledByteBuffer buffer;
- private ByteBuffer outputBuffer;
+ private ControlledByteBuffer outputBuffer;
private boolean producerCompleted = false;
@@ -70,13 +69,10 @@ public class Pipe {
private boolean hasHttpProducer = true;
- private AtomicBoolean inBufferInputMode = new AtomicBoolean(true);
- private AtomicBoolean outBufferInputMode;
-
private ByteBufferInputStream inputStream;
private ByteBufferOutputStream outputStream;
- public Pipe(IOControl producerIoControl, ByteBuffer buffer,
+ public Pipe(IOControl producerIoControl, ControlledByteBuffer buffer,
String name, BaseConfiguration baseConfig) {
this.producerIoControl = producerIoControl;
this.buffer = buffer;
@@ -84,7 +80,7 @@ public class Pipe {
this.baseConfig = baseConfig;
}
- public Pipe(ByteBuffer buffer, String name, BaseConfiguration baseConfig) {
+ public Pipe(ControlledByteBuffer buffer, String name, BaseConfiguration
baseConfig) {
this.buffer = buffer;
this.name += "_" + name;
this.baseConfig = baseConfig;
@@ -118,14 +114,11 @@ public class Pipe {
}
lock.lock();
- ByteBuffer consumerBuffer;
- AtomicBoolean inputMode;
+ ControlledByteBuffer consumerBuffer;
if (outputBuffer != null) {
consumerBuffer = outputBuffer;
- inputMode = outBufferInputMode;
} else {
consumerBuffer = buffer;
- inputMode = inBufferInputMode;
}
try {
// if producer at error we have to stop the encoding and return
immediately
@@ -134,9 +127,9 @@ public class Pipe {
return -1;
}
- setOutputMode(consumerBuffer, inputMode);
- int bytesWritten = encoder.write(consumerBuffer);
- setInputMode(consumerBuffer, inputMode);
+ setOutputMode(consumerBuffer);
+ int bytesWritten = encoder.write(consumerBuffer.getByteBuffer());
+ setInputMode(consumerBuffer);
if (consumerBuffer.position() == 0) {
if (outputBuffer == null) {
@@ -179,8 +172,8 @@ public class Pipe {
lock.lock();
try {
- setInputMode(buffer, inBufferInputMode);
- int bytesRead = decoder.read(buffer);
+ setInputMode(buffer);
+ int bytesRead = decoder.read(buffer.getByteBuffer());
// if consumer is at error we have to let the producer complete
if (consumerError) {
@@ -258,7 +251,6 @@ public class Pipe {
public synchronized OutputStream getOutputStream() {
if (outputStream == null) {
outputBuffer = baseConfig.getBufferFactory().getBuffer();
- outBufferInputMode = new AtomicBoolean(true);
outputStream = new ByteBufferOutputStream();
}
return outputStream;
@@ -267,7 +259,7 @@ public class Pipe {
public synchronized void setSerializationComplete(boolean
serializationComplete) {
if (!this.serializationComplete) {
this.serializationComplete = serializationComplete;
- if (consumerIoControl != null && hasData(outputBuffer,
outBufferInputMode)) {
+ if (consumerIoControl != null && hasData(outputBuffer)) {
consumerIoControl.requestOutput();
}
}
@@ -284,7 +276,7 @@ public class Pipe {
this.rawSerializationComplete = rawSerializationComplete;
}
- public ByteBuffer getBuffer() {
+ public ControlledByteBuffer getBuffer() {
return buffer;
}
@@ -292,8 +284,8 @@ public class Pipe {
return serializationComplete;
}
- private void setInputMode(ByteBuffer buffer, AtomicBoolean inputMode) {
- if (inputMode.compareAndSet(false, true)) {
+ private void setInputMode(ControlledByteBuffer buffer) {
+ if (buffer.setInputMode()) {
if (buffer.hasRemaining()) {
buffer.compact();
} else {
@@ -302,16 +294,16 @@ public class Pipe {
}
}
- private void setOutputMode(ByteBuffer buffer, AtomicBoolean inputMode) {
- if (inputMode.compareAndSet(true, false)) {
+ private void setOutputMode(ControlledByteBuffer buffer) {
+ if (buffer.setOutputMode()) {
buffer.flip();
}
}
- private boolean hasData(ByteBuffer buffer, AtomicBoolean inputMode) {
+ private boolean hasData(ControlledByteBuffer buffer) {
lock.lock();
try {
- setOutputMode(buffer, inputMode);
+ setOutputMode(buffer);
return buffer.hasRemaining();
} finally {
lock.unlock();
@@ -324,7 +316,7 @@ public class Pipe {
public int read() throws IOException {
lock.lock();
try {
- if (!hasData(buffer, inBufferInputMode)) {
+ if (!hasData(buffer)) {
waitForData();
if (producerError) {
return -1;
@@ -346,13 +338,13 @@ public class Pipe {
lock.lock();
try {
- if (!hasData(buffer, inBufferInputMode)) {
+ if (!hasData(buffer)) {
waitForData();
}
if (isEndOfStream()) {
return -1;
}
- setOutputMode(buffer, inBufferInputMode);
+ setOutputMode(buffer);
int chunk = len;
if (chunk > buffer.remaining()) {
chunk = buffer.remaining();
@@ -368,7 +360,7 @@ public class Pipe {
lock.lock();
try {
try {
- while (!hasData(buffer, inBufferInputMode) &&
!producerCompleted) {
+ while (!hasData(buffer) && !producerCompleted) {
if (producerError) {
break;
}
@@ -384,7 +376,7 @@ public class Pipe {
}
private boolean isEndOfStream() {
- return !hasData(buffer, inBufferInputMode) && producerCompleted;
+ return !hasData(buffer) && producerCompleted;
}
}
@@ -394,10 +386,10 @@ public class Pipe {
public void write(int b) throws IOException {
lock.lock();
try {
- setInputMode(outputBuffer, outBufferInputMode);
+ setInputMode(outputBuffer);
if (!outputBuffer.hasRemaining()) {
flushContent();
- setInputMode(outputBuffer, outBufferInputMode);
+ setInputMode(outputBuffer);
}
outputBuffer.put((byte) b);
} finally {
@@ -411,7 +403,7 @@ public class Pipe {
}
lock.lock();
try {
- setInputMode(outputBuffer, outBufferInputMode);
+ setInputMode(outputBuffer);
int remaining = len;
while (remaining > 0) {
if (!outputBuffer.hasRemaining()) {
@@ -420,7 +412,7 @@ public class Pipe {
buffer.clear();
break;
}
- setInputMode(outputBuffer, outBufferInputMode);
+ setInputMode(outputBuffer);
}
int chunk = Math.min(remaining, outputBuffer.remaining());
outputBuffer.put(b, off, chunk);
@@ -441,7 +433,7 @@ public class Pipe {
try {
try {
- while (hasData(outputBuffer,
outBufferInputMode)) {
+ while (hasData(outputBuffer)) {
if(consumerError) {
break;
}
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java?rev=1758147&r1=1758146&r2=1758147&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
Sun Aug 28 20:09:19 2016
@@ -21,6 +21,7 @@ package org.apache.synapse.transport.pas
import org.apache.http.nio.NHttpConnection;
import org.apache.synapse.transport.passthru.config.SourceConfiguration;
+import org.apache.synapse.transport.passthru.util.ControlledByteBuffer;
import java.nio.ByteBuffer;
@@ -85,7 +86,7 @@ public class SourceContext {
}
if (writer != null) {
- ByteBuffer buffer = writer.getBuffer();
+ ControlledByteBuffer buffer = writer.getBuffer();
buffer.clear();
sourceConfiguration.getBufferFactory().release(buffer);
}
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java?rev=1758147&r1=1758146&r2=1758147&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
Sun Aug 28 20:09:19 2016
@@ -22,6 +22,7 @@ package org.apache.synapse.transport.pas
import org.apache.axis2.context.MessageContext;
import org.apache.http.nio.NHttpConnection;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
+import org.apache.synapse.transport.passthru.util.ControlledByteBuffer;
import java.nio.ByteBuffer;
@@ -114,7 +115,7 @@ public class TargetContext {
}
if (writer != null) {
- ByteBuffer buffer = writer.getBuffer();
+ ControlledByteBuffer buffer = writer.getBuffer();
buffer.clear();
targetConfiguration.getBufferFactory().release(buffer);
}
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java?rev=1758147&r1=1758146&r2=1758147&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java
Sun Aug 28 20:09:19 2016
@@ -21,13 +21,12 @@ package org.apache.synapse.transport.pas
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.nio.util.HeapByteBufferAllocator;
-import java.nio.ByteBuffer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BufferFactory {
- private volatile ByteBuffer [] buffers;
+ private volatile ControlledByteBuffer [] buffers;
private volatile int marker = -1;
@@ -45,34 +44,36 @@ public class BufferFactory {
this.allocator = HeapByteBufferAllocator.INSTANCE;
}
- buffers = new ByteBuffer[size];
+ buffers = new ControlledByteBuffer[size];
}
- public ByteBuffer getBuffer() {
+ public ControlledByteBuffer getBuffer() {
if (marker == -1) {
- return allocator.allocate(bufferSize);
+ return new ControlledByteBuffer(allocator.allocate(bufferSize));
} else {
try {
lock.lock();
if (marker >= 0) {
- ByteBuffer b = buffers[marker];
- b.clear();
+ ControlledByteBuffer controlledByteBuffer =
buffers[marker];
+ controlledByteBuffer.clear();
+ controlledByteBuffer.forceSetInputMode();
buffers[marker] = null;
marker--;
- return b;
+ return controlledByteBuffer;
}
} finally {
lock.unlock();
}
}
- return allocator.allocate(bufferSize);
+ return new ControlledByteBuffer(allocator.allocate(bufferSize));
}
- public void release(ByteBuffer buffer) {
+ public void release(ControlledByteBuffer buffer) {
try {
lock.lock();
if (marker < buffers.length - 1) {
buffer.clear();
+ buffer.forceSetInputMode();
buffers[++marker] = buffer;
}
} finally {
Added:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java?rev=1758147&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
(added)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
Sun Aug 28 20:09:19 2016
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.transport.passthru.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ControlledByteBuffer {
+ private ByteBuffer byteBuffer;
+
+ private AtomicBoolean inputMode = new AtomicBoolean(true);
+
+ public ControlledByteBuffer(ByteBuffer byteBuffer) {
+ this.byteBuffer = byteBuffer;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return this.byteBuffer;
+ }
+
+ public boolean setInputMode() {
+ return this.inputMode.compareAndSet(false, true);
+ }
+
+ public boolean setOutputMode() {
+ return this.inputMode.compareAndSet(true, false);
+ }
+
+ public void forceSetInputMode() {
+ this.inputMode = new AtomicBoolean(true);
+ }
+
+ public void flip() {
+ this.byteBuffer.flip();
+ }
+
+ public void clear() {
+ this.byteBuffer.clear();
+ }
+
+ public void compact() {
+ this.byteBuffer.compact();
+ }
+
+ public int position() {
+ return this.byteBuffer.position();
+ }
+
+ public void put(byte b) {
+ this.byteBuffer.put(b);
+ }
+
+ public void putInt(int value) {
+ this.byteBuffer.putInt(value);
+ }
+
+ public ByteBuffer put(byte[] src, int offset, int length) {
+ return this.byteBuffer.put(src, offset, length);
+ }
+
+ public boolean hasRemaining() {
+ return this.byteBuffer.hasRemaining();
+ }
+
+ public byte get() {
+ return this.byteBuffer.get();
+ }
+
+ public ByteBuffer get(byte[] dst, int offset, int length) {
+ return this.byteBuffer.get(dst, offset, length);
+ }
+
+ public int remaining() {
+ return this.byteBuffer.remaining();
+ }
+}