Port easy I/O transforms to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/269fbf38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/269fbf38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/269fbf38 Branch: refs/heads/master Commit: 269fbf386454ea77845e54764a125edba7039b03 Parents: ef5e31f Author: Kenneth Knowles <[email protected]> Authored: Wed Aug 3 20:22:26 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Aug 4 14:56:42 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 3 +- .../java/org/apache/beam/sdk/io/PubsubIO.java | 14 ++++---- .../apache/beam/sdk/io/PubsubUnboundedSink.java | 17 +++++---- .../beam/sdk/io/PubsubUnboundedSource.java | 7 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 36 +++++++++----------- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 12 +++---- .../beam/sdk/io/gcp/datastore/V1Beta3.java | 18 +++++----- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 +++--- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 6 ++-- .../sdk/io/gcp/datastore/V1Beta3TestUtil.java | 9 +++-- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 10 +++--- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 19 +++++------ .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 10 +++--- 13 files changed, 82 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index abcf415..fadd9c7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -78,6 +78,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; @@ -2715,7 +2716,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @Nullable private PTransform<?, ?> transform; @Nullable - private OldDoFn<?, ?> doFn; + private DoFn<?, ?> doFn; /** * Builds an instance of this class from the overridden transform. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 1902bca..2b27175 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -709,11 +709,11 @@ public class PubsubIO { * * <p>Public so can be suppressed by runners. */ - public class PubsubBoundedReader extends OldDoFn<Void, T> { + public class PubsubBoundedReader extends DoFn<Void, T> { private static final int DEFAULT_PULL_SIZE = 100; private static final int ACK_TIMEOUT_SEC = 60; - @Override + @ProcessElement public void processElement(ProcessContext c) throws IOException { try (PubsubClient pubsubClient = FACTORY.newClient(timestampLabel, idLabel, @@ -998,12 +998,12 @@ public class PubsubIO { * * <p>Public so can be suppressed by runners. */ - public class PubsubBoundedWriter extends OldDoFn<T, Void> { + public class PubsubBoundedWriter extends DoFn<T, Void> { private static final int MAX_PUBLISH_BATCH_SIZE = 100; private transient List<OutgoingMessage> output; private transient PubsubClient pubsubClient; - @Override + @StartBundle public void startBundle(Context c) throws IOException { this.output = new ArrayList<>(); // NOTE: idLabel is ignored. @@ -1012,7 +1012,7 @@ public class PubsubIO { c.getPipelineOptions().as(PubsubOptions.class)); } - @Override + @ProcessElement public void processElement(ProcessContext c) throws IOException { // NOTE: The record id is always null. OutgoingMessage message = @@ -1025,7 +1025,7 @@ public class PubsubIO { } } - @Override + @FinishBundle public void finishBundle(Context c) throws IOException { if (!output.isEmpty()) { publish(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 9e9536d..3014751 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -65,7 +65,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; - import javax.annotation.Nullable; /** @@ -78,7 +77,7 @@ import javax.annotation.Nullable; * <li>We try to send messages in batches while also limiting send latency. * <li>No stats are logged. Rather some counters are used to keep track of elements and batches. * <li>Though some background threads are used by the underlying netty system all actual Pubsub - * calls are blocking. We rely on the underlying runner to allow multiple {@link OldDoFn} instances + * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances * to execute concurrently and hide latency. * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer * to dedup messages. @@ -155,7 +154,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { /** * Convert elements to messages and shard them. */ - private static class ShardFn<T> extends OldDoFn<T, KV<Integer, OutgoingMessage>> { + private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> { private final Aggregator<Long, Long> elementCounter = createAggregator("elements", new Sum.SumLongFn()); private final Coder<T> elementCoder; @@ -168,7 +167,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { this.recordIdMethod = recordIdMethod; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { elementCounter.addValue(1L); byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); @@ -207,7 +206,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { * Publish messages to Pubsub in batches. */ private static class WriterFn - extends OldDoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> { + extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> { private final PubsubClientFactory pubsubFactory; private final TopicPath topic; private final String timestampLabel; @@ -253,14 +252,14 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { byteCounter.addValue((long) bytes); } - @Override + @StartBundle public void startBundle(Context c) throws Exception { checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel, c.getPipelineOptions().as(PubsubOptions.class)); } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize); int bytes = 0; @@ -285,7 +284,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { } } - @Override + @FinishBundle public void finishBundle(Context c) throws Exception { pubsubClient.close(); pubsubClient = null; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index d98bd6a..f99b471 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -77,7 +77,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; - import javax.annotation.Nullable; /** @@ -1107,7 +1106,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> // StatsFn // ================================================================================ - private static class StatsFn<T> extends OldDoFn<T, T> { + private static class StatsFn<T> extends DoFn<T, T> { private final Aggregator<Long, Long> elementCounter = createAggregator("elements", new Sum.SumLongFn()); @@ -1131,7 +1130,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> this.idLabel = idLabel; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { elementCounter.addValue(1L); c.output(c.element()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 2ba7562..ed2c32e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -103,7 +104,6 @@ import com.google.common.io.CountingOutputStream; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.apache.avro.generic.GenericRecord; import org.joda.time.Instant; import org.slf4j.Logger; @@ -135,7 +135,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; - import javax.annotation.Nullable; /** @@ -334,7 +333,7 @@ public class BigQueryIO { * <p>Each {@link TableRow} contains values indexed by column name. Here is a * sample processing function that processes a "line" column from rows: * <pre>{@code - * static class ExtractWordsFn extends OldDoFn<TableRow, String> { + * static class ExtractWordsFn extends DoFn<TableRow, String> { * public void processElement(ProcessContext c) { * // Get the "line" field of the TableRow object, split it into words, and emit them. * TableRow row = c.element(); @@ -706,8 +705,8 @@ public class BigQueryIO { input.getPipeline() .apply("Create(CleanupOperation)", Create.of(cleanupOperation)) .apply("Cleanup", ParDo.of( - new OldDoFn<CleanupOperation, Void>() { - @Override + new DoFn<CleanupOperation, Void>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.element().cleanup(c.getPipelineOptions()); @@ -717,8 +716,8 @@ public class BigQueryIO { return outputs.get(mainOutput); } - private static class IdentityFn<T> extends OldDoFn<T, T> { - @Override + private static class IdentityFn<T> extends DoFn<T, T> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element()); } @@ -1271,7 +1270,7 @@ public class BigQueryIO { * <p>Here is a sample transform that produces TableRow values containing * "word" and "count" columns: * <pre>{@code - * static class FormatCountsFn extends OldDoFn<KV<String, Long>, TableRow> { + * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> { * public void processElement(ProcessContext c) { * TableRow row = new TableRow() * .set("word", c.element().getKey()) @@ -2307,11 +2306,11 @@ public class BigQueryIO { ///////////////////////////////////////////////////////////////////////////// /** - * Implementation of OldDoFn to perform streaming BigQuery write. + * Implementation of DoFn to perform streaming BigQuery write. */ @SystemDoFnInternal private static class StreamingWriteFn - extends OldDoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { + extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { /** TableSchema in JSON. Use String to make the class Serializable. */ private final String jsonTableSchema; @@ -2339,14 +2338,14 @@ public class BigQueryIO { } /** Prepares a target BigQuery table. */ - @Override + @StartBundle public void startBundle(Context context) { tableRows = new HashMap<>(); uniqueIdsForTableRows = new HashMap<>(); } /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */ - @Override + @ProcessElement public void processElement(ProcessContext context) { String tableSpec = context.element().getKey().getKey(); List<TableRow> rows = getOrCreateMapListValue(tableRows, tableSpec); @@ -2357,7 +2356,7 @@ public class BigQueryIO { } /** Writes the accumulated rows into BigQuery with streaming API. */ - @Override + @FinishBundle public void finishBundle(Context context) throws Exception { BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); @@ -2544,8 +2543,7 @@ public class BigQueryIO { * id is created by concatenating this randomUUID with a sequential number. */ private static class TagWithUniqueIdsAndTable - extends OldDoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> - implements OldDoFn.RequiresWindowAccess { + extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> { /** TableSpec to write to. */ private final String tableSpec; @@ -2571,18 +2569,18 @@ public class BigQueryIO { } - @Override + @StartBundle public void startBundle(Context context) { randomUUID = UUID.randomUUID().toString(); } /** Tag the input with a unique id. */ - @Override - public void processElement(ProcessContext context) throws IOException { + @ProcessElement + public void processElement(ProcessContext context, BoundedWindow window) throws IOException { String uniqueId = randomUUID + sequenceNo++; ThreadLocalRandom randomGenerator = ThreadLocalRandom.current(); String tableSpec = tableSpecFromWindow( - context.getPipelineOptions().as(BigQueryOptions.class), context.window()); + context.getPipelineOptions().as(BigQueryOptions.class), window); // We output on keys 0-50 to ensure that there's enough batching for // BigQuery. context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 1f77e3e..bfdf4aa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -55,7 +55,6 @@ import com.google.common.util.concurrent.Futures; import com.google.protobuf.ByteString; import io.grpc.Status; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +64,6 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentLinkedQueue; - import javax.annotation.Nullable; /** @@ -512,7 +510,7 @@ public class BigtableIO { return new BigtableServiceImpl(options); } - private class BigtableWriterFn extends OldDoFn<KV<ByteString, Iterable<Mutation>>, Void> { + private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> { public BigtableWriterFn(String tableId, BigtableService bigtableService) { this.tableId = checkNotNull(tableId, "tableId"); @@ -520,13 +518,13 @@ public class BigtableIO { this.failures = new ConcurrentLinkedQueue<>(); } - @Override + @StartBundle public void startBundle(Context c) throws Exception { bigtableWriter = bigtableService.openForWriting(tableId); recordsWritten = 0; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { checkForFailures(); Futures.addCallback( @@ -534,7 +532,7 @@ public class BigtableIO { ++recordsWritten; } - @Override + @FinishBundle public void finishBundle(Context c) throws Exception { bigtableWriter.close(); bigtableWriter = null; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java index 6f3663a..052feb3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java @@ -37,9 +37,9 @@ import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; @@ -478,11 +478,11 @@ public class V1Beta3 { } /** - * A {@link OldDoFn} that splits a given query into multiple sub-queries, assigns them unique + * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique * keys and outputs them as {@link KV}. */ @VisibleForTesting - static class SplitQueryFn extends OldDoFn<Query, KV<Integer, Query>> { + static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> { private final V1Beta3Options options; // number of splits to make for a given query private final int numSplits; @@ -505,13 +505,13 @@ public class V1Beta3 { this.datastoreFactory = datastoreFactory; } - @Override + @StartBundle public void startBundle(Context c) throws Exception { datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId); querySplitter = datastoreFactory.getQuerySplitter(); } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { int key = 1; Query query = c.element(); @@ -559,10 +559,10 @@ public class V1Beta3 { } /** - * A {@link OldDoFn} that reads entities from Datastore for each query. + * A {@link DoFn} that reads entities from Datastore for each query. */ @VisibleForTesting - static class ReadFn extends OldDoFn<Query, Entity> { + static class ReadFn extends DoFn<Query, Entity> { private final V1Beta3Options options; private final V1Beta3DatastoreFactory datastoreFactory; // Datastore client @@ -578,13 +578,13 @@ public class V1Beta3 { this.datastoreFactory = datastoreFactory; } - @Override + @StartBundle public void startBundle(Context c) throws Exception { datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId()); } /** Read and output entities for the given query. */ - @Override + @ProcessElement public void processElement(ProcessContext context) throws Exception { Query query = context.element(); String namespace = options.getNamespace(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 1ea1f94..6d6eb60 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static com.google.common.base.Preconditions.checkArgument; + import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -64,8 +65,8 @@ import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -131,7 +132,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; - import javax.annotation.Nullable; /** @@ -235,7 +235,7 @@ public class BigQueryIOTest implements Serializable { private Object[] pollJobReturns; private String executingProject; // Both counts will be reset back to zeros after serialization. - // This is a work around for OldDoFn's verifyUnmodified check. + // This is a work around for DoFn's verifyUnmodified check. private transient int startJobCallsCount; private transient int pollJobStatusCallsCount; @@ -571,8 +571,8 @@ public class BigQueryIOTest implements Serializable { .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable") .withTestServices(fakeBqServices) .withoutValidation()) - .apply(ParDo.of(new OldDoFn<TableRow, String>() { - @Override + .apply(ParDo.of(new DoFn<TableRow, String>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output((String) c.element().get("name")); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index 83489a5..ee3a6f9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -108,8 +108,8 @@ public class BigtableWriteIT implements Serializable { Pipeline p = Pipeline.create(options); p.apply(CountingInput.upTo(numRows)) - .apply(ParDo.of(new OldDoFn<Long, KV<ByteString, Iterable<Mutation>>>() { - @Override + .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() { + @ProcessElement public void processElement(ProcessContext c) { int index = c.element().intValue(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java index daed1cb..7eaf23e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java @@ -27,7 +27,7 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; @@ -60,7 +60,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.UUID; - import javax.annotation.Nullable; class V1Beta3TestUtil { @@ -109,9 +108,9 @@ class V1Beta3TestUtil { } /** - * A OldDoFn that creates entity for a long number. + * A DoFn that creates entity for a long number. */ - static class CreateEntityFn extends OldDoFn<Long, Entity> { + static class CreateEntityFn extends DoFn<Long, Entity> { private final String kind; @Nullable private final String namespace; @@ -124,7 +123,7 @@ class V1Beta3TestUtil { ancestorKey = makeAncestorKey(namespace, kind, ancestor); } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(makeEntity(c.element(), ancestorKey, kind, namespace)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index eeb02e6..557fe13 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -453,7 +453,7 @@ public class JmsIO { checkArgument((queue != null || topic != null), "Either queue or topic is required"); } - private static class JmsWriter extends OldDoFn<String, Void> { + private static class JmsWriter extends DoFn<String, Void> { private ConnectionFactory connectionFactory; private String queue; @@ -469,7 +469,7 @@ public class JmsIO { this.topic = topic; } - @Override + @StartBundle public void startBundle(Context c) throws Exception { if (producer == null) { this.connection = connectionFactory.createConnection(); @@ -486,7 +486,7 @@ public class JmsIO { } } - @Override + @ProcessElement public void processElement(ProcessContext ctx) throws Exception { String value = ctx.element(); @@ -499,7 +499,7 @@ public class JmsIO { } } - @Override + @FinishBundle public void finishBundle(Context c) throws Exception { producer.close(); producer = null; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 2271216..2383105 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -94,7 +94,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import javax.annotation.Nullable; /** @@ -550,8 +549,8 @@ public class KafkaIO { return typedRead .apply(begin) .apply("Remove Kafka Metadata", - ParDo.of(new OldDoFn<KafkaRecord<K, V>, KV<K, V>>() { - @Override + ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() { + @ProcessElement public void processElement(ProcessContext ctx) { ctx.output(ctx.element().getKV()); } @@ -1315,8 +1314,8 @@ public class KafkaIO { public PDone apply(PCollection<V> input) { return input .apply("Kafka values with default key", - ParDo.of(new OldDoFn<V, KV<Void, V>>() { - @Override + ParDo.of(new DoFn<V, KV<Void, V>>() { + @ProcessElement public void processElement(ProcessContext ctx) throws Exception { ctx.output(KV.<Void, V>of(null, ctx.element())); } @@ -1326,9 +1325,9 @@ public class KafkaIO { } } - private static class KafkaWriter<K, V> extends OldDoFn<KV<K, V>, Void> { + private static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> { - @Override + @StartBundle public void startBundle(Context c) throws Exception { // Producer initialization is fairly costly. Move this to future initialization api to avoid // creating a producer for each bundle. @@ -1341,7 +1340,7 @@ public class KafkaIO { } } - @Override + @ProcessElement public void processElement(ProcessContext ctx) throws Exception { checkForFailures(); @@ -1351,7 +1350,7 @@ public class KafkaIO { new SendCallback()); } - @Override + @FinishBundle public void finishBundle(Context c) throws Exception { producer.flush(); producer.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index d7b1921..9a89c36 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -33,10 +33,10 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -280,8 +280,8 @@ public class KafkaIOTest { p.run(); } - private static class ElementValueDiff extends OldDoFn<Long, Long> { - @Override + private static class ElementValueDiff extends DoFn<Long, Long> { + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element() - c.timestamp().getMillis()); } @@ -308,8 +308,8 @@ public class KafkaIOTest { p.run(); } - private static class RemoveKafkaMetadata<K, V> extends OldDoFn<KafkaRecord<K, V>, KV<K, V>> { - @Override + private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> { + @ProcessElement public void processElement(ProcessContext ctx) throws Exception { ctx.output(ctx.element().getKV()); }
