http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java index 6c9a4cb..d8ed121 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java @@ -20,35 +20,25 @@ package org.apache.beam.runners.core; import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.collection.IsEmptyCollection.empty; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.ServiceLoader; +import java.util.Map; import org.apache.beam.fn.harness.fn.ThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; -import org.hamcrest.Matchers; -import org.hamcrest.collection.IsMapContaining; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -56,25 +46,27 @@ import org.junit.runners.JUnit4; /** Tests for {@link BoundedSourceRunner}. */ @RunWith(JUnit4.class) public class BoundedSourceRunnerTest { - - public static final String URN = "urn:org.apache.beam:source:java:0.1"; - @Test public void testRunReadLoopWithMultipleSources() throws Exception { - List<WindowedValue<Long>> out1Values = new ArrayList<>(); + List<WindowedValue<Long>> out1ValuesA = new ArrayList<>(); + List<WindowedValue<Long>> out1ValuesB = new ArrayList<>(); List<WindowedValue<Long>> out2Values = new ArrayList<>(); - Collection<ThrowingConsumer<WindowedValue<Long>>> consumers = - ImmutableList.of(out1Values::add, out2Values::add); + Map<String, Collection<ThrowingConsumer<WindowedValue<Long>>>> outputMap = ImmutableMap.of( + "out1", ImmutableList.of(out1ValuesA::add, out1ValuesB::add), + "out2", ImmutableList.of(out2Values::add)); - BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>( + BoundedSourceRunner<BoundedSource<Long>, Long> runner = + new BoundedSourceRunner<>( PipelineOptionsFactory.create(), RunnerApi.FunctionSpec.getDefaultInstance(), - consumers); + outputMap); runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2))); runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(1))); - assertThat(out1Values, + assertThat(out1ValuesA, + contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L))); + assertThat(out1ValuesB, contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L))); assertThat(out2Values, contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L))); @@ -82,106 +74,40 @@ public class BoundedSourceRunnerTest { @Test public void testRunReadLoopWithEmptySource() throws Exception { - List<WindowedValue<Long>> outValues = new ArrayList<>(); - Collection<ThrowingConsumer<WindowedValue<Long>>> consumers = - ImmutableList.of(outValues::add); + List<WindowedValue<Long>> out1Values = new ArrayList<>(); + Map<String, Collection<ThrowingConsumer<WindowedValue<Long>>>> outputMap = ImmutableMap.of( + "out1", ImmutableList.of(out1Values::add)); - BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>( + BoundedSourceRunner<BoundedSource<Long>, Long> runner = + new BoundedSourceRunner<>( PipelineOptionsFactory.create(), RunnerApi.FunctionSpec.getDefaultInstance(), - consumers); + outputMap); runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0))); - assertThat(outValues, empty()); + assertThat(out1Values, empty()); } @Test public void testStart() throws Exception { List<WindowedValue<Long>> outValues = new ArrayList<>(); - Collection<ThrowingConsumer<WindowedValue<Long>>> consumers = - ImmutableList.of(outValues::add); + Map<String, Collection<ThrowingConsumer<WindowedValue<Long>>>> outputMap = ImmutableMap.of( + "out", ImmutableList.of(outValues::add)); ByteString encodedSource = ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3))); - BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>( + BoundedSourceRunner<BoundedSource<Long>, Long> runner = + new BoundedSourceRunner<>( PipelineOptionsFactory.create(), - RunnerApi.FunctionSpec.newBuilder().setParameter( + RunnerApi.FunctionSpec.newBuilder().setParameter( Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(), - consumers); + outputMap); runner.start(); assertThat(outValues, contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(2L))); } - - @Test - public void testCreatingAndProcessingSourceFromFactory() throws Exception { - List<WindowedValue<String>> outputValues = new ArrayList<>(); - - Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create(); - consumers.put("outputPC", - (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) outputValues::add); - List<ThrowingRunnable> startFunctions = new ArrayList<>(); - List<ThrowingRunnable> finishFunctions = new ArrayList<>(); - - RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() - .setUrn("urn:org.apache.beam:source:java:0.1") - .setParameter(Any.pack(BytesValue.newBuilder() - .setValue(ByteString.copyFrom( - SerializableUtils.serializeToByteArray(CountingSource.upTo(3)))) - .build())) - .build(); - - RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder() - .setSpec(functionSpec) - .putInputs("input", "inputPC") - .putOutputs("output", "outputPC") - .build(); - - new BoundedSourceRunner.Factory<>().createRunnerForPTransform( - PipelineOptionsFactory.create(), - null /* beamFnDataClient */, - "pTransformId", - pTransform, - Suppliers.ofInstance("57L")::get, - ImmutableMap.of(), - ImmutableMap.of(), - consumers, - startFunctions::add, - finishFunctions::add); - - // This is testing a deprecated way of running sources and should be removed - // once all source definitions are instead propagated along the input edge. - Iterables.getOnlyElement(startFunctions).run(); - assertThat(outputValues, contains( - valueInGlobalWindow(0L), - valueInGlobalWindow(1L), - valueInGlobalWindow(2L))); - outputValues.clear(); - - // Check that when passing a source along as an input, the source is processed. - assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC")); - Iterables.getOnlyElement(consumers.get("inputPC")).accept( - valueInGlobalWindow(CountingSource.upTo(2))); - assertThat(outputValues, contains( - valueInGlobalWindow(0L), - valueInGlobalWindow(1L))); - - assertThat(finishFunctions, Matchers.empty()); - } - - @Test - public void testRegistration() { - for (Registrar registrar : - ServiceLoader.load(Registrar.class)) { - if (registrar instanceof BoundedSourceRunner.Registrar) { - assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN)); - return; - } - } - fail("Expected registrar not found."); - } }
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java deleted file mode 100644 index c4df77a..0000000 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core; - -import static org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow; -import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; -import com.google.protobuf.Message; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; -import org.apache.beam.fn.harness.fn.ThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar; -import org.apache.beam.runners.core.construction.ParDoTranslation; -import org.apache.beam.runners.dataflow.util.CloudObjects; -import org.apache.beam.runners.dataflow.util.DoFnInfo; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.hamcrest.collection.IsMapContaining; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link FnApiDoFnRunner}. */ -@RunWith(JUnit4.class) -public class FnApiDoFnRunnerTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final Coder<WindowedValue<String>> STRING_CODER = - WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); - private static final String STRING_CODER_SPEC_ID = "999L"; - private static final RunnerApi.Coder STRING_CODER_SPEC; - - static { - try { - STRING_CODER_SPEC = RunnerApi.Coder.newBuilder() - .setSpec(RunnerApi.SdkFunctionSpec.newBuilder() - .setSpec(RunnerApi.FunctionSpec.newBuilder() - .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER)))) - .build()))) - .build()) - .build(); - } catch (IOException e) { - throw new ExceptionInInitializerError(e); - } - } - - private static class TestDoFn extends DoFn<String, String> { - private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput"); - private static final TupleTag<String> additionalOutput = new TupleTag<>("output"); - - private BoundedWindow window; - - @ProcessElement - public void processElement(ProcessContext context, BoundedWindow window) { - context.output("MainOutput" + context.element()); - context.output(additionalOutput, "AdditionalOutput" + context.element()); - this.window = window; - } - - @FinishBundle - public void finishBundle(FinishBundleContext context) { - if (window != null) { - context.output("FinishBundle", window.maxTimestamp(), window); - window = null; - } - } - } - - /** - * Create a DoFn that has 3 inputs (inputATarget1, inputATarget2, inputBTarget) and 2 outputs - * (mainOutput, output). Validate that inputs are fed to the {@link DoFn} and that outputs - * are directed to the correct consumers. - */ - @Test - public void testCreatingAndProcessingDoFn() throws Exception { - Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); - String pTransformId = "pTransformId"; - String mainOutputId = "101"; - String additionalOutputId = "102"; - - DoFnInfo<?, ?> doFnInfo = DoFnInfo.forFn( - new TestDoFn(), - WindowingStrategy.globalDefault(), - ImmutableList.of(), - StringUtf8Coder.of(), - Long.parseLong(mainOutputId), - ImmutableMap.of( - Long.parseLong(mainOutputId), TestDoFn.mainOutput, - Long.parseLong(additionalOutputId), TestDoFn.additionalOutput)); - RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() - .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN) - .setParameter(Any.pack(BytesValue.newBuilder() - .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo))) - .build())) - .build(); - RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder() - .setSpec(functionSpec) - .putInputs("inputA", "inputATarget") - .putInputs("inputB", "inputBTarget") - .putOutputs(mainOutputId, "mainOutputTarget") - .putOutputs(additionalOutputId, "additionalOutputTarget") - .build(); - - List<WindowedValue<String>> mainOutputValues = new ArrayList<>(); - List<WindowedValue<String>> additionalOutputValues = new ArrayList<>(); - Multimap<String, ThrowingConsumer<WindowedValue<?>>> consumers = HashMultimap.create(); - consumers.put("mainOutputTarget", - (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) mainOutputValues::add); - consumers.put("additionalOutputTarget", - (ThrowingConsumer) (ThrowingConsumer<WindowedValue<String>>) additionalOutputValues::add); - List<ThrowingRunnable> startFunctions = new ArrayList<>(); - List<ThrowingRunnable> finishFunctions = new ArrayList<>(); - - new FnApiDoFnRunner.Factory<>().createRunnerForPTransform( - PipelineOptionsFactory.create(), - null /* beamFnDataClient */, - pTransformId, - pTransform, - Suppliers.ofInstance("57L")::get, - ImmutableMap.of(), - ImmutableMap.of(), - consumers, - startFunctions::add, - finishFunctions::add); - - Iterables.getOnlyElement(startFunctions).run(); - mainOutputValues.clear(); - - assertThat(consumers.keySet(), containsInAnyOrder( - "inputATarget", "inputBTarget", "mainOutputTarget", "additionalOutputTarget")); - - Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A1")); - Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A2")); - Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("B")); - assertThat(mainOutputValues, contains( - valueInGlobalWindow("MainOutputA1"), - valueInGlobalWindow("MainOutputA2"), - valueInGlobalWindow("MainOutputB"))); - assertThat(additionalOutputValues, contains( - valueInGlobalWindow("AdditionalOutputA1"), - valueInGlobalWindow("AdditionalOutputA2"), - valueInGlobalWindow("AdditionalOutputB"))); - mainOutputValues.clear(); - additionalOutputValues.clear(); - - Iterables.getOnlyElement(finishFunctions).run(); - assertThat( - mainOutputValues, - contains( - timestampedValueInGlobalWindow("FinishBundle", GlobalWindow.INSTANCE.maxTimestamp()))); - mainOutputValues.clear(); - } - - @Test - public void testRegistration() { - for (Registrar registrar : - ServiceLoader.load(Registrar.class)) { - if (registrar instanceof FnApiDoFnRunner.Registrar) { - assertThat(registrar.getPTransformRunnerFactories(), - IsMapContaining.hasKey(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)); - return; - } - } - fail("Expected registrar not found."); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml deleted file mode 100644 index 8da9448..0000000 --- a/sdks/java/io/amqp/pom.xml +++ /dev/null @@ -1,100 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>beam-sdks-java-io-amqp</artifactId> - <name>Apache Beam :: SDKs :: Java :: IO :: AMQP</name> - <description>IO to read and write using AMQP 1.0 protocol (http://www.amqp.org).</description> - - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-core</artifactId> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>proton-j</artifactId> - <version>0.13.1</version> - </dependency> - - <!-- compile dependencies --> - <dependency> - <groupId>com.google.auto.value</groupId> - <artifactId>auto-value</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.google.auto.service</groupId> - <artifactId>auto-service</artifactId> - <optional>true</optional> - </dependency> - - <!-- test dependencies --> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-direct-java</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java deleted file mode 100644 index 1f307b2..0000000 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.amqp; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import com.google.common.base.Joiner; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.messenger.Messenger; -import org.apache.qpid.proton.messenger.Tracker; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * AmqpIO supports AMQP 1.0 protocol using the Apache QPid Proton-J library. - * - * <p>It's also possible to use AMQP 1.0 protocol via Apache Qpid JMS connection factory and the - * Apache Beam JmsIO. - * - * <h3>Binding AMQP and receive messages</h3> - * - * <p>The {@link AmqpIO} {@link Read} can bind a AMQP listener endpoint and receive messages. It can - * also connect to a AMPQ broker (such as Apache Qpid or Apache ActiveMQ). - * - * <p>{@link AmqpIO} {@link Read} returns an unbounded {@link PCollection} of {@link Message} - * containing the received messages. - * - * <p>To configure a AMQP source, you have to provide a list of addresses where it will receive - * messages. An address has the following form: {@code - * [amqp[s]://][user[:password]@]domain[/[name]]} where {@code domain} can be one of {@code - * host | host:port | ip | ip:port | name}. NB: the {@code ~} character allows to bind a AMQP - * listener instead of connecting to a remote broker. For instance {@code amqp://~0.0.0.0:1234} - * will bind a AMQP listener on any network interface on the 1234 port number. - * - * <p>The following example illustrates how to configure a AMQP source: - * - * <pre>{@code - * - * pipeline.apply(AmqpIO.read() - * .withAddresses(Collections.singletonList("amqp://host:1234"))) - * - * }</pre> - * - * <h3>Sending messages to a AMQP endpoint</h3> - * - * <p>{@link AmqpIO} provides a sink to send {@link PCollection} elements as messages. - * - * <p>As for the {@link Read}, {@link AmqpIO} {@link Write} requires a list of addresses where to - * send messages. The following example illustrates how to configure the {@link AmqpIO} - * {@link Write}: - * - * <pre>{@code - * - * pipeline - * .apply(...) // provide PCollection<Message> - * .apply(AmqpIO.write()); - * - * }</pre> - */ -@Experimental(Experimental.Kind.SOURCE_SINK) -public class AmqpIO { - - public static Read read() { - return new AutoValue_AmqpIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build(); - } - - public static Write write() { - return new AutoValue_AmqpIO_Write(); - } - - private AmqpIO() { - } - - /** - * A {@link PTransform} to read/receive messages using AMQP 1.0 protocol. - */ - @AutoValue - public abstract static class Read extends PTransform<PBegin, PCollection<Message>> { - - @Nullable abstract List<String> addresses(); - abstract long maxNumRecords(); - @Nullable abstract Duration maxReadTime(); - - abstract Builder builder(); - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setAddresses(List<String> addresses); - abstract Builder setMaxNumRecords(long maxNumRecords); - abstract Builder setMaxReadTime(Duration maxReadTime); - abstract Read build(); - } - - /** - * Define the AMQP addresses where to receive messages. - */ - public Read withAddresses(List<String> addresses) { - checkArgument(addresses != null, "AmqpIO.read().withAddresses(addresses) called with null" - + " addresses"); - checkArgument(!addresses.isEmpty(), "AmqpIO.read().withAddresses(addresses) called with " - + "empty addresses list"); - return builder().setAddresses(addresses).build(); - } - - /** - * Define the max number of records received by the {@link Read}. - * When the max number of records is lower than {@code Long.MAX_VALUE}, the {@link Read} will - * provide a bounded {@link PCollection}. - */ - public Read withMaxNumRecords(long maxNumRecords) { - checkArgument(maxReadTime() == null, - "maxNumRecord and maxReadTime are exclusive"); - return builder().setMaxNumRecords(maxNumRecords).build(); - } - - /** - * Define the max read time (duration) while the {@link Read} will receive messages. - * When this max read time is not null, the {@link Read} will provide a bounded - * {@link PCollection}. - */ - public Read withMaxReadTime(Duration maxReadTime) { - checkArgument(maxNumRecords() == Long.MAX_VALUE, - "maxNumRecord and maxReadTime are exclusive"); - return builder().setMaxReadTime(maxReadTime).build(); - } - - @Override - public void validate(PipelineOptions pipelineOptions) { - checkState(addresses() != null, "AmqIO.read() requires addresses list to be set via " - + "withAddresses(addresses)"); - checkState(!addresses().isEmpty(), "AmqIO.read() requires a non-empty addresses list to be" - + " set via withAddresses(addresses)"); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("addresses", Joiner.on(" ").join(addresses()))); - } - - @Override - public PCollection<Message> expand(PBegin input) { - org.apache.beam.sdk.io.Read.Unbounded<Message> unbounded = - org.apache.beam.sdk.io.Read.from(new UnboundedAmqpSource(this)); - - PTransform<PBegin, PCollection<Message>> transform = unbounded; - - if (maxNumRecords() != Long.MAX_VALUE) { - transform = unbounded.withMaxNumRecords(maxNumRecords()); - } else if (maxReadTime() != null) { - transform = unbounded.withMaxReadTime(maxReadTime()); - } - - return input.getPipeline().apply(transform); - } - - } - - private static class AmqpCheckpointMark implements UnboundedSource.CheckpointMark, Serializable { - - private transient Messenger messenger; - private transient List<Tracker> trackers = new ArrayList<>(); - - public AmqpCheckpointMark() { - } - - @Override - public void finalizeCheckpoint() { - for (Tracker tracker : trackers) { - // flag as not cumulative - messenger.accept(tracker, 0); - } - trackers.clear(); - } - - // set an empty list to messages when deserialize - private void readObject(java.io.ObjectInputStream stream) - throws java.io.IOException, ClassNotFoundException { - trackers = new ArrayList<>(); - } - - } - - private static class UnboundedAmqpSource - extends UnboundedSource<Message, AmqpCheckpointMark> { - - private final Read spec; - - public UnboundedAmqpSource(Read spec) { - this.spec = spec; - } - - @Override - public List<UnboundedAmqpSource> split(int desiredNumSplits, - PipelineOptions pipelineOptions) { - // amqp is a queue system, so, it's possible to have multiple concurrent sources, even if - // they bind the listener - List<UnboundedAmqpSource> sources = new ArrayList<>(); - for (int i = 0; i < Math.max(1, desiredNumSplits); ++i) { - sources.add(new UnboundedAmqpSource(spec)); - } - return sources; - } - - @Override - public UnboundedReader<Message> createReader(PipelineOptions pipelineOptions, - AmqpCheckpointMark checkpointMark) { - return new UnboundedAmqpReader(this, checkpointMark); - } - - @Override - public Coder<Message> getDefaultOutputCoder() { - return new AmqpMessageCoder(); - } - - @Override - public Coder<AmqpCheckpointMark> getCheckpointMarkCoder() { - return SerializableCoder.of(AmqpCheckpointMark.class); - } - - @Override - public void validate() { - spec.validate(null); - } - - } - - private static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader<Message> { - - private final UnboundedAmqpSource source; - - private Messenger messenger; - private Message current; - private Instant currentTimestamp; - private Instant watermark = new Instant(Long.MIN_VALUE); - private AmqpCheckpointMark checkpointMark; - - public UnboundedAmqpReader(UnboundedAmqpSource source, AmqpCheckpointMark checkpointMark) { - this.source = source; - this.current = null; - if (checkpointMark != null) { - this.checkpointMark = checkpointMark; - } else { - this.checkpointMark = new AmqpCheckpointMark(); - } - } - - @Override - public Instant getWatermark() { - return watermark; - } - - @Override - public Instant getCurrentTimestamp() { - if (current == null) { - throw new NoSuchElementException(); - } - return currentTimestamp; - } - - @Override - public Message getCurrent() { - if (current == null) { - throw new NoSuchElementException(); - } - return current; - } - - @Override - public UnboundedSource.CheckpointMark getCheckpointMark() { - return checkpointMark; - } - - @Override - public UnboundedAmqpSource getCurrentSource() { - return source; - } - - @Override - public boolean start() throws IOException { - Read spec = source.spec; - messenger = Messenger.Factory.create(); - messenger.start(); - for (String address : spec.addresses()) { - messenger.subscribe(address); - } - checkpointMark.messenger = messenger; - return advance(); - } - - @Override - public boolean advance() { - messenger.recv(); - if (messenger.incoming() <= 0) { - current = null; - return false; - } - Message message = messenger.get(); - Tracker tracker = messenger.incomingTracker(); - checkpointMark.trackers.add(tracker); - currentTimestamp = new Instant(message.getCreationTime()); - watermark = currentTimestamp; - current = message; - return true; - } - - @Override - public void close() { - if (messenger != null) { - messenger.stop(); - } - } - - } - - /** - * A {@link PTransform} to send messages using AMQP 1.0 protocol. - */ - @AutoValue - public abstract static class Write extends PTransform<PCollection<Message>, PDone> { - - @Override - public PDone expand(PCollection<Message> input) { - input.apply(ParDo.of(new WriteFn(this))); - return PDone.in(input.getPipeline()); - } - - private static class WriteFn extends DoFn<Message, Void> { - - private final Write spec; - - private transient Messenger messenger; - - public WriteFn(Write spec) { - this.spec = spec; - } - - @Setup - public void setup() throws Exception { - messenger = Messenger.Factory.create(); - messenger.start(); - } - - @ProcessElement - public void processElement(ProcessContext processContext) throws Exception { - Message message = processContext.element(); - messenger.put(message); - messenger.send(); - } - - @Teardown - public void teardown() throws Exception { - if (messenger != null) { - messenger.stop(); - } - } - - } - - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java deleted file mode 100644 index 5a55260..0000000 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.amqp; - -import com.google.common.io.ByteStreams; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.BufferOverflowException; - -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.util.VarInt; -import org.apache.qpid.proton.message.Message; - -/** - * A coder for AMQP message. - */ -public class AmqpMessageCoder extends CustomCoder<Message> { - - private static final int[] MESSAGE_SIZES = new int[]{ - 8 * 1024, - 64 * 1024, - 1 * 1024 * 1024, - 64 * 1024 * 1024 - }; - - static AmqpMessageCoder of() { - return new AmqpMessageCoder(); - } - - @Override - public void encode(Message value, OutputStream outStream) throws CoderException, IOException { - for (int maxMessageSize : MESSAGE_SIZES) { - try { - encode(value, outStream, maxMessageSize); - return; - } catch (Exception e) { - continue; - } - } - throw new CoderException("Message is larger than the max size supported by the coder"); - } - - private void encode(Message value, OutputStream outStream, int messageSize) throws - IOException, BufferOverflowException { - byte[] data = new byte[messageSize]; - int bytesWritten = value.encode(data, 0, data.length); - VarInt.encode(bytesWritten, outStream); - outStream.write(data, 0, bytesWritten); - } - - @Override - public Message decode(InputStream inStream) throws CoderException, IOException { - Message message = Message.Factory.create(); - int bytesToRead = VarInt.decodeInt(inStream); - byte[] encodedMessage = new byte[bytesToRead]; - ByteStreams.readFully(inStream, encodedMessage); - message.decode(encodedMessage, 0, encodedMessage.length); - return message; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java deleted file mode 100644 index bc3445c..0000000 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.amqp; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - -import java.util.List; - -import org.apache.beam.sdk.coders.CoderProvider; -import org.apache.beam.sdk.coders.CoderProviderRegistrar; -import org.apache.beam.sdk.coders.CoderProviders; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.qpid.proton.message.Message; - -/** - * A {@link CoderProviderRegistrar} for standard types used with {@link AmqpIO}. - */ -@AutoService(CoderProviderRegistrar.class) -public class AmqpMessageCoderProviderRegistrar implements CoderProviderRegistrar { - - @Override - public List<CoderProvider> getCoderProviders() { - return ImmutableList.of( - CoderProviders.forCoder(TypeDescriptor.of(Message.class), - AmqpMessageCoder.of())); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java deleted file mode 100644 index 091f234..0000000 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Transforms for reading and writing using AMQP 1.0 protocol. - */ -package org.apache.beam.sdk.io.amqp; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java deleted file mode 100644 index c8fe4e8..0000000 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.amqp; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.net.ServerSocket; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PCollection; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.messenger.Messenger; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tests on {@link AmqpIO}. - */ -@RunWith(JUnit4.class) -public class AmqpIOTest { - - private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class); - - private int port; - - @Rule public TestPipeline pipeline = TestPipeline.create(); - - @Before - public void findFreeNetworkPort() throws Exception { - LOG.info("Finding free network port"); - ServerSocket socket = new ServerSocket(0); - port = socket.getLocalPort(); - socket.close(); - } - - @Test - public void testRead() throws Exception { - PCollection<Message> output = pipeline.apply(AmqpIO.read() - .withMaxNumRecords(100) - .withAddresses(Collections.singletonList("amqp://~localhost:" + port))); - PAssert.thatSingleton(output.apply(Count.<Message>globally())).isEqualTo(100L); - - Thread sender = new Thread() { - public void run() { - try { - Thread.sleep(500); - Messenger sender = Messenger.Factory.create(); - sender.start(); - for (int i = 0; i < 100; i++) { - Message message = Message.Factory.create(); - message.setAddress("amqp://localhost:" + port); - message.setBody(new AmqpValue("Test " + i)); - sender.put(message); - sender.send(); - } - sender.stop(); - } catch (Exception e) { - LOG.error("Sender error", e); - } - } - }; - try { - sender.start(); - pipeline.run(); - } finally { - sender.join(); - } - } - - @Test - public void testWrite() throws Exception { - final List<String> received = new ArrayList<>(); - Thread receiver = new Thread() { - @Override - public void run() { - try { - Messenger messenger = Messenger.Factory.create(); - messenger.start(); - messenger.subscribe("amqp://~localhost:" + port); - while (received.size() < 100) { - messenger.recv(); - while (messenger.incoming() > 0) { - Message message = messenger.get(); - LOG.info("Received: " + message.getBody().toString()); - received.add(message.getBody().toString()); - } - } - messenger.stop(); - } catch (Exception e) { - LOG.error("Receiver error", e); - } - } - }; - LOG.info("Starting AMQP receiver"); - receiver.start(); - - List<Message> data = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - Message message = Message.Factory.create(); - message.setBody(new AmqpValue("Test " + i)); - message.setAddress("amqp://localhost:" + port); - message.setSubject("test"); - data.add(message); - } - pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write()); - LOG.info("Starting pipeline"); - try { - pipeline.run(); - } finally { - LOG.info("Join receiver thread"); - receiver.join(); - } - - assertEquals(100, received.size()); - for (int i = 0; i < 100; i++) { - assertTrue(received.contains("AmqpValue{Test " + i + "}")); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java deleted file mode 100644 index 7a8efeb..0000000 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.amqp; - -import static org.junit.Assert.assertEquals; - -import com.google.common.base.Joiner; - -import java.util.Collections; - -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.message.Message; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test on {@link AmqpMessageCoder}. - */ -@RunWith(JUnit4.class) -public class AmqpMessageCoderTest { - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void encodeDecode() throws Exception { - Message message = Message.Factory.create(); - message.setBody(new AmqpValue("body")); - message.setAddress("address"); - message.setSubject("test"); - AmqpMessageCoder coder = AmqpMessageCoder.of(); - - Message clone = CoderUtils.clone(coder, message); - - assertEquals("AmqpValue{body}", clone.getBody().toString()); - assertEquals("address", clone.getAddress()); - assertEquals("test", clone.getSubject()); - } - - @Test - public void encodeDecodeTooMuchLargerMessage() throws Exception { - thrown.expect(CoderException.class); - Message message = Message.Factory.create(); - message.setAddress("address"); - message.setSubject("subject"); - String body = Joiner.on("").join(Collections.nCopies(64 * 1024 * 1024, " ")); - message.setBody(new AmqpValue(body)); - - AmqpMessageCoder coder = AmqpMessageCoder.of(); - - byte[] encoded = CoderUtils.encodeToByteArray(coder, message); - } - - @Test - public void encodeDecodeLargeMessage() throws Exception { - Message message = Message.Factory.create(); - message.setAddress("address"); - message.setSubject("subject"); - String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " ")); - message.setBody(new AmqpValue(body)); - - AmqpMessageCoder coder = AmqpMessageCoder.of(); - - Message clone = CoderUtils.clone(coder, message); - - clone.getBody().toString().equals(message.getBody().toString()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml index c74477e..8249f57 100644 --- a/sdks/java/io/cassandra/pom.xml +++ b/sdks/java/io/cassandra/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java index 32905b7..b6f4ef6 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java @@ -82,7 +82,7 @@ import org.slf4j.LoggerFactory; * .withEntity(Person.class)); * }</pre> */ -@Experimental(Experimental.Kind.SOURCE_SINK) +@Experimental public class CassandraIO { private static final Logger LOG = LoggerFactory.getLogger(CassandraIO.class); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/common/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml index df0d94b..f7525fd 100644 --- a/sdks/java/io/common/pom.xml +++ b/sdks/java/io/common/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index 25ab929..387fd22 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -71,7 +71,11 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { Integer getElasticsearchHttpPort(); void setElasticsearchHttpPort(Integer value); - /* Cassandra */ + @Description("Tcp port for elasticsearch server") + @Default.Integer(9300) + Integer getElasticsearchTcpPort(); + void setElasticsearchTcpPort(Integer value); + @Description("Host for Cassandra server (host name/ip address)") @Default.String("cassandra-host") String getCassandraHost(); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml index e0a7f21..03632ce 100644 --- a/sdks/java/io/elasticsearch/pom.xml +++ b/sdks/java/io/elasticsearch/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -137,14 +137,6 @@ <scope>test</scope> </dependency> - <!-- This optional dependency is used by the test framework. Avoids a warning --> - <dependency> - <groupId>net.java.dev.jna</groupId> - <artifactId>jna</artifactId> - <version>4.1.0</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 4d76887..f6ceef2 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -113,7 +113,7 @@ import org.elasticsearch.client.RestClientBuilder; * <p>Optionally, you can provide {@code withBatchSize()} and {@code withBatchSizeBytes()} * to specify the size of the write batch in number of documents or in bytes. */ -@Experimental(Experimental.Kind.SOURCE_SINK) +@Experimental public class ElasticsearchIO { public static Read read() { @@ -139,7 +139,7 @@ public class ElasticsearchIO { private static final ObjectMapper mapper = new ObjectMapper(); - static JsonNode parseResponse(Response response) throws IOException { + private static JsonNode parseResponse(Response response) throws IOException { return mapper.readValue(response.getEntity().getContent(), JsonNode.class); } @@ -264,7 +264,7 @@ public class ElasticsearchIO { builder.addIfNotNull(DisplayData.item("username", getUsername())); } - RestClient createClient() throws MalformedURLException { + private RestClient createClient() throws MalformedURLException { HttpHost[] hosts = new HttpHost[getAddresses().size()]; int i = 0; for (String address : getAddresses()) { @@ -455,7 +455,16 @@ public class ElasticsearchIO { while (shards.hasNext()) { Map.Entry<String, JsonNode> shardJson = shards.next(); String shardId = shardJson.getKey(); - sources.add(new BoundedElasticsearchSource(spec, shardId)); + JsonNode value = (JsonNode) shardJson.getValue(); + boolean isPrimaryShard = + value + .path(0) + .path("routing") + .path("primary") + .asBoolean(); + if (isPrimaryShard) { + sources.add(new BoundedElasticsearchSource(spec, shardId)); + } } checkArgument(!sources.isEmpty(), "No primary shard found"); return sources; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java index 203963d..b0d161f 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java @@ -17,17 +17,19 @@ */ package org.apache.beam.sdk.io.elasticsearch; -import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import org.apache.http.HttpEntity; -import org.apache.http.entity.ContentType; -import org.apache.http.message.BasicHeader; -import org.apache.http.nio.entity.NStringEntity; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.RestClient; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.client.Requests; +import org.elasticsearch.index.IndexNotFoundException; /** Test utilities to use with {@link ElasticsearchIO}. */ class ElasticSearchIOTestUtils { @@ -39,68 +41,57 @@ class ElasticSearchIOTestUtils { } /** Deletes the given index synchronously. */ - static void deleteIndex(String index, RestClient restClient) throws IOException { - try { - restClient.performRequest("DELETE", String.format("/%s", index), new BasicHeader("", "")); - } catch (IOException e) { - // it is fine to ignore this expression as deleteIndex occurs in @before, - // so when the first tests is run, the index does not exist yet - if (!e.getMessage().contains("index_not_found_exception")){ - throw e; - } + static void deleteIndex(String index, Client client) throws Exception { + IndicesAdminClient indices = client.admin().indices(); + IndicesExistsResponse indicesExistsResponse = + indices.exists(new IndicesExistsRequest(index)).get(); + if (indicesExistsResponse.isExists()) { + indices.prepareClose(index).get(); + indices.delete(Requests.deleteIndexRequest(index)).get(); } } /** Inserts the given number of test documents into Elasticsearch. */ - static void insertTestDocuments(String index, String type, long numDocs, RestClient restClient) - throws IOException { + static void insertTestDocuments(String index, String type, long numDocs, Client client) + throws Exception { + final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefresh(true); List<String> data = ElasticSearchIOTestUtils.createDocuments( numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); - StringBuilder bulkRequest = new StringBuilder(); for (String document : data) { - bulkRequest.append(String.format("{ \"index\" : {} }%n%s%n", document)); + bulkRequestBuilder.add(client.prepareIndex(index, type, null).setSource(document)); } - String endPoint = String.format("/%s/%s/_bulk", index, type); - HttpEntity requestBody = - new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON); - Response response = restClient.performRequest("POST", endPoint, - Collections.singletonMap("refresh", "true"), requestBody, - new BasicHeader("", "")); - JsonNode searchResult = ElasticsearchIO.parseResponse(response); - boolean errors = searchResult.path("errors").asBoolean(); - if (errors){ + final BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); + if (bulkResponse.hasFailures()) { throw new IOException( - String.format("Failed to insert test documents in index %s", index)); + String.format( + "Cannot insert test documents in index %s : %s", + index, bulkResponse.buildFailureMessage())); } } /** - * Forces a refresh of the given index to make recently inserted documents available for search. + * Forces an upgrade of the given index to make recently inserted documents available for search. * * @return The number of docs in the index */ - static long refreshIndexAndGetCurrentNumDocs(String index, String type, RestClient restClient) - throws IOException { - long result = 0; + static long upgradeIndexAndGetCurrentNumDocs(String index, String type, Client client) { try { - String endPoint = String.format("/%s/_refresh", index); - restClient.performRequest("POST", endPoint, new BasicHeader("", "")); - - endPoint = String.format("/%s/%s/_search", index, type); - Response response = restClient.performRequest("GET", endPoint, new BasicHeader("", "")); - JsonNode searchResult = ElasticsearchIO.parseResponse(response); - result = searchResult.path("hits").path("total").asLong(); - } catch (IOException e) { + client.admin().indices().upgrade(new UpgradeRequest(index)).actionGet(); + SearchResponse response = + client.prepareSearch(index).setTypes(type).execute().actionGet(5000); + return response.getHits().getTotalHits(); // it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes, // we call upgrade before any doc have been written // (when there are fewer docs processed than batchSize). // In that cases index/type has not been created (created upon first doc insertion) - if (!e.getMessage().contains("index_not_found_exception")){ + } catch (IndexNotFoundException e) { + } catch (java.lang.IllegalArgumentException e) { + if (!e.getMessage().contains("No search type")) { throw e; } } - return result; + return 0; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index 7c37e87..2d6393a 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; -import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.transport.TransportClient; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; */ public class ElasticsearchIOIT { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOIT.class); - private static RestClient restClient; + private static TransportClient client; private static IOTestPipelineOptions options; private static ElasticsearchIO.ConnectionConfiguration readConnectionConfiguration; @Rule public TestPipeline pipeline = TestPipeline.create(); @@ -66,16 +66,16 @@ public class ElasticsearchIOIT { public static void beforeClass() throws Exception { PipelineOptionsFactory.register(IOTestPipelineOptions.class); options = TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class); + client = ElasticsearchTestDataSet.getClient(options); readConnectionConfiguration = ElasticsearchTestDataSet.getConnectionConfiguration( options, ElasticsearchTestDataSet.ReadOrWrite.READ); - restClient = readConnectionConfiguration.createClient(); } @AfterClass public static void afterClass() throws Exception { - ElasticsearchTestDataSet.deleteIndex(restClient, ElasticsearchTestDataSet.ReadOrWrite.WRITE); - restClient.close(); + ElasticsearchTestDataSet.deleteIndex(client, ElasticsearchTestDataSet.ReadOrWrite.WRITE); + client.close(); } @Test @@ -128,8 +128,8 @@ public class ElasticsearchIOIT { pipeline.run(); long currentNumDocs = - ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs( - ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, restClient); + ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs( + ElasticsearchTestDataSet.ES_INDEX, ElasticsearchTestDataSet.ES_TYPE, client); assertEquals(ElasticsearchTestDataSet.NUM_DOCS, currentNumDocs); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index b349a29..260af79 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -39,11 +39,11 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.values.PCollection; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; import org.hamcrest.CustomMatcher; import org.junit.AfterClass; import org.junit.Before; @@ -74,10 +74,9 @@ public class ElasticsearchIOTest implements Serializable { private static final long BATCH_SIZE_BYTES = 2048L; private static Node node; - private static RestClient restClient; private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration; - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @ClassRule public static TemporaryFolder folder = new TemporaryFolder(); @Rule public TestPipeline pipeline = TestPipeline.create(); @@ -92,8 +91,8 @@ public class ElasticsearchIOTest implements Serializable { .put("cluster.name", "beam") .put("http.enabled", "true") .put("node.data", "true") - .put("path.data", TEMPORARY_FOLDER.getRoot().getPath()) - .put("path.home", TEMPORARY_FOLDER.getRoot().getPath()) + .put("path.data", folder.getRoot().getPath()) + .put("path.home", folder.getRoot().getPath()) .put("node.name", "beam") .put("network.host", ES_IP) .put("http.port", esHttpPort) @@ -101,29 +100,27 @@ public class ElasticsearchIOTest implements Serializable { // had problems with some jdk, embedded ES was too slow for bulk insertion, // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test) .put("threadpool.bulk.queue_size", 100); - node = new Node(settingsBuilder.build()); + node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build(); LOG.info("Elasticsearch node created"); node.start(); connectionConfiguration = ElasticsearchIO.ConnectionConfiguration.create( new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE); - restClient = connectionConfiguration.createClient(); } @AfterClass - public static void afterClass() throws IOException{ - restClient.close(); + public static void afterClass() { node.close(); } @Before public void before() throws Exception { - ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, restClient); + ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client()); } @Test public void testSizes() throws Exception { - ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); PipelineOptions options = PipelineOptionsFactory.create(); ElasticsearchIO.Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); @@ -137,7 +134,7 @@ public class ElasticsearchIOTest implements Serializable { @Test public void testRead() throws Exception { - ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); PCollection<String> output = pipeline.apply( @@ -153,7 +150,7 @@ public class ElasticsearchIOTest implements Serializable { @Test public void testReadWithQuery() throws Exception { - ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); String query = "{\n" @@ -188,7 +185,7 @@ public class ElasticsearchIOTest implements Serializable { pipeline.run(); long currentNumDocs = - ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient); + ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client()); assertEquals(NUM_DOCS, currentNumDocs); QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("Einstein").field("scientist"); @@ -261,8 +258,9 @@ public class ElasticsearchIOTest implements Serializable { if ((numDocsProcessed % 100) == 0) { // force the index to upgrade after inserting for the inserted docs // to be searchable immediately - long currentNumDocs = ElasticSearchIOTestUtils - .refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient); + long currentNumDocs = + ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs( + ES_INDEX, ES_TYPE, node.client()); if ((numDocsProcessed % BATCH_SIZE) == 0) { /* bundle end */ assertEquals( @@ -306,8 +304,8 @@ public class ElasticsearchIOTest implements Serializable { // force the index to upgrade after inserting for the inserted docs // to be searchable immediately long currentNumDocs = - ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs( - ES_INDEX, ES_TYPE, restClient); + ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs( + ES_INDEX, ES_TYPE, node.client()); if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) { /* bundle end */ assertThat( @@ -329,7 +327,7 @@ public class ElasticsearchIOTest implements Serializable { @Test public void testSplit() throws Exception { - ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); PipelineOptions options = PipelineOptionsFactory.create(); ElasticsearchIO.Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java index 2a2dbe9..3a9aae6 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.io.elasticsearch; +import static java.net.InetAddress.getByName; import java.io.IOException; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.transport.InetSocketTransportAddress; /** * Manipulates test data used by the {@link ElasticsearchIO} @@ -49,6 +51,7 @@ public class ElasticsearchTestDataSet { * -Dexec.mainClass=org.apache.beam.sdk.io.elasticsearch.ElasticsearchTestDataSet \ * -Dexec.args="--elasticsearchServer=1.2.3.4 \ * --elasticsearchHttpPort=9200 \ + * --elasticsearchTcpPort=9300" \ * -Dexec.classpathScope=test * </pre> * @@ -59,20 +62,29 @@ public class ElasticsearchTestDataSet { PipelineOptionsFactory.register(IOTestPipelineOptions.class); IOTestPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class); - createAndPopulateReadIndex(options); + + createAndPopulateIndex(getClient(options), ReadOrWrite.READ); } - private static void createAndPopulateReadIndex(IOTestPipelineOptions options) throws Exception { - RestClient restClient = getConnectionConfiguration(options, ReadOrWrite.READ).createClient(); + private static void createAndPopulateIndex(TransportClient client, ReadOrWrite rOw) + throws Exception { // automatically creates the index and insert docs - try { - ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); - } finally { - restClient.close(); - } + ElasticSearchIOTestUtils.insertTestDocuments( + (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, ES_TYPE, NUM_DOCS, client); + } + + public static TransportClient getClient(IOTestPipelineOptions options) throws Exception { + TransportClient client = + TransportClient.builder() + .build() + .addTransportAddress( + new InetSocketTransportAddress( + getByName(options.getElasticsearchServer()), + options.getElasticsearchTcpPort())); + return client; } - static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration( + public static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration( IOTestPipelineOptions options, ReadOrWrite rOw) throws IOException { ElasticsearchIO.ConnectionConfiguration connectionConfiguration = ElasticsearchIO.ConnectionConfiguration.create( @@ -87,9 +99,8 @@ public class ElasticsearchTestDataSet { return connectionConfiguration; } - static void deleteIndex(RestClient restClient, ReadOrWrite rOw) throws Exception { - ElasticSearchIOTestUtils - .deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, restClient); + public static void deleteIndex(TransportClient client, ReadOrWrite rOw) throws Exception { + ElasticSearchIOTestUtils.deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, client); } /** Enum that tells whether we use the index for reading or for writing. */ http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index a1495f2..8b53820 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -93,12 +93,7 @@ <dependency> <groupId>com.google.api</groupId> - <artifactId>gax-grpc</artifactId> - </dependency> - - <dependency> - <groupId>com.google.cloud</groupId> - <artifactId>google-cloud-core-grpc</artifactId> + <artifactId>api-common</artifactId> </dependency> <dependency> @@ -258,6 +253,11 @@ <artifactId>proto-google-common-protos</artifactId> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <!-- Test dependencies --> <dependency> <groupId>org.apache.beam</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index e46b1d3..4393a63 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -58,7 +57,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java index c5c2462..edb1e0d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java @@ -23,7 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.Lists; import java.io.Serializable; -import java.lang.reflect.TypeVariable; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -31,7 +32,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; /** @@ -158,16 +158,21 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab } // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry. // We must first use reflection to figure out what the type parameter is. - TypeDescriptor<?> superDescriptor = - TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class); - if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) { - throw new AssertionError( - "Couldn't find the DynamicDestinations superclass of " + this.getClass()); + for (Type superclass = getClass().getGenericSuperclass(); + superclass != null; + superclass = ((Class) superclass).getGenericSuperclass()) { + if (superclass instanceof ParameterizedType) { + ParameterizedType parameterized = (ParameterizedType) superclass; + if (parameterized.getRawType() == DynamicDestinations.class) { + // DestinationT is the second parameter. + Type parameter = parameterized.getActualTypeArguments()[1]; + @SuppressWarnings("unchecked") + Class<DestinationT> parameterClass = (Class<DestinationT>) parameter; + return registry.getCoder(parameterClass); + } + } } - TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT"); - @SuppressWarnings("unchecked") - TypeDescriptor<DestinationT> descriptor = - (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable); - return registry.getCoder(descriptor); + throw new AssertionError( + "Couldn't find the DynamicDestinations superclass of " + this.getClass()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java index 55672ff..90d41a0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java @@ -23,7 +23,6 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.ShardedKey; /** * Given a write to a specific table, assign that to one of the http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java new file mode 100644 index 0000000..c2b739f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import java.io.Serializable; +import java.util.Objects; + +/** + * A key and a shard number. + */ +class ShardedKey<K> implements Serializable { + private static final long serialVersionUID = 1L; + private final K key; + private final int shardNumber; + + public static <K> ShardedKey<K> of(K key, int shardNumber) { + return new ShardedKey<>(key, shardNumber); + } + + ShardedKey(K key, int shardNumber) { + this.key = key; + this.shardNumber = shardNumber; + } + + public K getKey() { + return key; + } + + public int getShardNumber() { + return shardNumber; + } + + @Override + public String toString() { + return "key: " + key + " shard: " + shardNumber; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ShardedKey)) { + return false; + } + ShardedKey<K> other = (ShardedKey<K>) o; + return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber); + } + + @Override + public int hashCode() { + return Objects.hash(key, shardNumber); + } +}
