This is an automated email from the ASF dual-hosted git repository. wujimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit fde6f4dc8c52360476dcfe0b4845a17abae2a758 Author: heyile <[email protected]> AuthorDate: Thu Nov 22 20:14:59 2018 +0800 [SCB-968]968 http2 do not support pump download --- .../foundation/vertx/stream/PumpFactoryImpl.java | 40 ++++++++ .../foundation/vertx/stream/PumpImpl.java | 108 +++++++++++++++++++++ .../services/io.vertx.core.spi.PumpFactory | 1 + .../vertx/stream/TestPumpFactoryImpl.java | 33 +++++++ .../foundation/vertx/stream/TestPumpImpl.java | 50 ++++++++++ .../org/apache/servicecomb/it/ConsumerMain.java | 5 +- .../servicecomb/it/schema/DownloadSchema.java | 1 - 7 files changed, 233 insertions(+), 5 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFactoryImpl.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFactoryImpl.java new file mode 100644 index 0000000..c3284e3 --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFactoryImpl.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.foundation.vertx.stream; + +import java.util.Objects; + +import io.vertx.core.spi.PumpFactory; +import io.vertx.core.streams.Pump; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +public class PumpFactoryImpl implements PumpFactory { + @Override + public <T> Pump pump(ReadStream<T> rs, WriteStream<T> ws) { + Objects.requireNonNull(rs); + Objects.requireNonNull(ws); + return new PumpImpl<>(rs, ws); + } + + @Override + public <T> Pump pump(ReadStream<T> rs, WriteStream<T> ws, int writeQueueMaxSize) { + Objects.requireNonNull(rs); + Objects.requireNonNull(ws); + return new PumpImpl<>(rs, ws, writeQueueMaxSize); + } +} \ No newline at end of file diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImpl.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImpl.java new file mode 100644 index 0000000..3657e73 --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImpl.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.foundation.vertx.stream; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.Pump; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +public class PumpImpl<T> implements Pump { + + private final ReadStream<T> readStream; + + private final WriteStream<T> writeStream; + + private final Handler<T> dataHandler; + + private final Handler<Void> drainHandler; + + private int pumped; + + public PumpImpl(ReadStream<T> readStream, WriteStream<T> writeStream, int maxWriteQueueSize) { + this(readStream, writeStream); + this.writeStream.setWriteQueueMaxSize(maxWriteQueueSize); + } + + public PumpImpl(ReadStream<T> readStream, WriteStream<T> writeStream) { + this.readStream = readStream; + this.writeStream = writeStream; + drainHandler = v -> readStream.resume(); + dataHandler = data -> { + if (data instanceof Buffer) { + if (((Buffer) data).length() == 0) { + return; + } + } + writeStream.write(data); + incPumped(); + if (writeStream.writeQueueFull()) { + readStream.pause(); + writeStream.drainHandler(drainHandler); + } + }; + } + + + /** + * Set the write queue max size to {@code maxSize} + */ + @Override + public PumpImpl<T> setWriteQueueMaxSize(int maxSize) { + writeStream.setWriteQueueMaxSize(maxSize); + return this; + } + + /** + * Start the Pump. The Pump can be started and stopped multiple times. + */ + @Override + public PumpImpl<T> start() { + readStream.handler(dataHandler); + return this; + } + + /** + * Stop the Pump. The Pump can be started and stopped multiple times. + */ + @Override + public PumpImpl<T> stop() { + writeStream.drainHandler(null); + readStream.handler(null); + return this; + } + + /** + * Return the total number of elements pumped by this pump. + */ + @Override + public synchronized int numberPumped() { + return pumped; + } + + // Note we synchronize as numberPumped can be called from a different thread however incPumped will always + // be called from the same thread so we benefit from bias locked optimisation which should give a very low + // overhead + private synchronized void incPumped() { + pumped++; + } + + public Handler<T> getDataHandler() { + return dataHandler; + } +} diff --git a/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.PumpFactory b/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.PumpFactory new file mode 100644 index 0000000..1d25eef --- /dev/null +++ b/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.PumpFactory @@ -0,0 +1 @@ +org.apache.servicecomb.foundation.vertx.stream.PumpFactoryImpl \ No newline at end of file diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFactoryImpl.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFactoryImpl.java new file mode 100644 index 0000000..f86f46d --- /dev/null +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFactoryImpl.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.foundation.vertx.stream; + +import org.junit.Assert; +import org.junit.Test; + +import io.vertx.core.streams.Pump; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; +import mockit.Mocked; + +public class TestPumpFactoryImpl { + @Test + public void pump(@Mocked ReadStream<Object> rs, @Mocked WriteStream<Object> ws) { + Pump pump = Pump.pump(rs, ws); + Assert.assertTrue(pump instanceof PumpImpl); + } +} diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpImpl.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpImpl.java new file mode 100644 index 0000000..a2ccb17 --- /dev/null +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpImpl.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.foundation.vertx.stream; + +import org.junit.Assert; +import org.junit.Test; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; +import mockit.Expectations; +import mockit.Mocked; + +public class TestPumpImpl { + + @Test + public void testPumpWithPending(@Mocked ReadStream<Object> rs, @Mocked WriteStream<Object> ws, @Mocked Buffer zeroBuf, + @Mocked Buffer contentBuf) { + PumpImpl<Object> pump = new PumpImpl<>(rs, ws); + Handler<Object> handler = pump.getDataHandler(); + new Expectations() { + { + zeroBuf.length(); + result = 0; + contentBuf.length(); + result = 1; + } + }; + handler.handle(zeroBuf); + handler.handle(contentBuf); + Assert.assertEquals(1, pump.numberPumped()); + handler.handle(contentBuf); + Assert.assertEquals(2, pump.numberPumped()); + } +} diff --git a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java index 2209583..0ef5039 100644 --- a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java +++ b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java @@ -103,10 +103,7 @@ public class ConsumerMain { // only rest support default value feature ITJUnitUtils.runWithRest(TestDefaultValue.class); - // currently have bug with http2 - if (!ITJUnitUtils.getProducerName().endsWith("-h2") && !ITJUnitUtils.getProducerName().endsWith("-h2c")) { - ITJUnitUtils.runWithRest(TestDownload.class); - } + ITJUnitUtils.runWithRest(TestDownload.class); ITJUnitUtils.runWithHighwayAndRest(TestTrace.class); ITJUnitUtils.run(TestTraceEdge.class); diff --git a/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java b/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java index b1123b6..5ba7d8b 100644 --- a/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java +++ b/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java @@ -197,7 +197,6 @@ public class DownloadSchema implements BootListener { .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE) .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=netInputStream.txt") .body(conn.getInputStream()); - conn.disconnect(); return responseEntity; }
