[ https://issues.apache.org/jira/browse/BEAM-2852?focusedWorklogId=89403&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89403 ]
ASF GitHub Bot logged work on BEAM-2852: ---------------------------------------- Author: ASF GitHub Bot Created on: 10/Apr/18 14:11 Start Date: 10/Apr/18 14:11 Worklog Time Spent: 10m Work Description: echauchot closed pull request #5019: [BEAM-2852] Add support for Kafka as source/sink on Nexmark URL: https://github.com/apache/beam/pull/5019 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/nexmark/build.gradle b/sdks/java/nexmark/build.gradle index b8f354d7af5..83a3d05f99d 100644 --- a/sdks/java/nexmark/build.gradle +++ b/sdks/java/nexmark/build.gradle @@ -27,6 +27,7 @@ dependencies { shadow project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadow") shadow project(path: ":beam-sdks-java-extensions-google-cloud-platform-core", configuration: "shadow") shadow project(path: ":beam-sdks-java-extensions-sql", configuration: "shadow") + shadow project(path: ":beam-sdks-java-io-kafka", configuration: "shadow") shadow library.java.google_api_services_bigquery shadow library.java.jackson_core shadow library.java.jackson_annotations @@ -38,6 +39,7 @@ dependencies { shadow library.java.junit shadow library.java.hamcrest_core shadow library.java.commons_lang3 + shadow library.java.kafka_clients shadow project(path: ":beam-runners-direct-java", configuration: "shadow") shadow library.java.slf4j_jdk14 testCompile library.java.hamcrest_core diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml index 613ddc70a58..4d943751675 100644 --- a/sdks/java/nexmark/pom.xml +++ b/sdks/java/nexmark/pom.xml @@ -192,6 +192,15 @@ <argLine>-da</argLine> <!-- disable assert in Calcite converter validation --> </configuration> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> </plugins> </build> @@ -274,13 +283,13 @@ <artifactId>hamcrest-core</artifactId> <scope>compile</scope> </dependency> - + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <scope>compile</scope> </dependency> - + <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> @@ -303,5 +312,23 @@ <artifactId>commons-lang3</artifactId> <scope>runtime</scope> </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-kafka</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.clients.version}</version> + </dependency> + </dependencies> </project> diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index bd746ce537a..cd24897664a 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.nexmark; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.api.services.bigquery.model.TableFieldSchema; @@ -42,6 +43,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; @@ -88,11 +90,16 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import org.slf4j.LoggerFactory; @@ -764,6 +771,69 @@ public void processElement(ProcessContext c) { })); } + static final DoFn<Event, byte[]> EVENT_TO_BYTEARRAY = + new DoFn<Event, byte[]>() { + @ProcessElement + public void processElement(ProcessContext c) { + try { + byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element()); + c.output(encodedEvent); + } catch (CoderException e1) { + LOG.error("Error while sending Event {} to Kafka: serialization error", + c.element().toString()); + } + } + }; + + /** + * Send {@code events} to Kafka. + */ + private void sinkEventsToKafka(PCollection<Event> events) { + PCollection<byte[]> eventToBytes = + events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)); + eventToBytes.apply(KafkaIO.<Void, byte[]>write() + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaSinkTopic()) + .withValueSerializer(ByteArraySerializer.class) + .values()); + + } + + + static final DoFn<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT = + new DoFn<KV<Long, byte[]>, Event>() { + @ProcessElement + public void processElement(ProcessContext c) { + byte[] encodedEvent = c.element().getValue(); + try { + Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent); + c.output(event); + } catch (CoderException e) { + LOG.error("Error while decoding Event from Kafka message: serialization error"); + } + } + }; + + /** + * Return source of events from Kafka. + */ + private PCollection<Event> sourceEventsFromKafka(Pipeline p) { + NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic()); + + checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()), + "Missing --bootstrapServers"); + + KafkaIO.Read<Long, byte[]> read = KafkaIO.<Long, byte[]>read() + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaSourceTopic()) + .withKeyDeserializer(LongDeserializer.class) + .withValueDeserializer(ByteArrayDeserializer.class); + + return p + .apply(queryName + ".ReadKafkaEvents", read.withoutMetadata()) + .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT)); + } + /** * Return Avro source of events from {@code options.getInputFilePrefix}. */ @@ -813,6 +883,22 @@ public void processElement(ProcessContext c) { .apply(queryName + ".WritePubsubEvents", io); } + /** + * Send {@code formattedResults} to Kafka. + */ + private void sinkResultsToKafka(PCollection<String> formattedResults) { + checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()), + "Missing --bootstrapServers"); + + formattedResults.apply( + queryName + ".WriteKafkaResults", + KafkaIO.<Void, String>write() + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaSinkTopic()) + .withValueSerializer(StringSerializer.class) + .values()); + } + /** * Send {@code formattedResults} to Pubsub. */ @@ -923,6 +1009,9 @@ private void sinkResultsToBigQuery( case AVRO: source = sourceEventsFromAvro(p); break; + case KAFKA: + source = sourceEventsFromKafka(p); + break; case PUBSUB: // Setup the sink for the publisher. switch (configuration.pubSubMode) { @@ -1010,6 +1099,9 @@ private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) { case PUBSUB: sinkResultsToPubsub(formattedResults, now); break; + case KAFKA: + sinkResultsToKafka(formattedResults); + break; case TEXT: sinkResultsToText(formattedResults, now); break; diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 0c5c1c1c368..a3386b66d2b 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -406,4 +406,24 @@ String getQueryLanguage(); void setQueryLanguage(String value); + + @Description("Base name of Kafka source topic in streaming mode.") + @Nullable + @Default.String("nexmark-source") + String getKafkaSourceTopic(); + + void setKafkaSourceTopic(String value); + + @Description("Base name of Kafka sink topic in streaming mode.") + @Nullable + @Default.String("nexmark-sink") + String getKafkaSinkTopic(); + + void setKafkaSinkTopic(String value); + + @Description("Kafka Bootstrap Server domains.") + @Nullable + String getBootstrapServers(); + + void setBootstrapServers(String value); } diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java index 3eb6f79c3ea..5d89dbde439 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java @@ -99,7 +99,11 @@ /** * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline. */ - PUBSUB + PUBSUB, + /** + * Read events from a Kafka topic. It will be fed the same synthetic events by this pipeline. + */ + KAFKA } /** @@ -118,6 +122,10 @@ * Write to a PubSub topic. It will be drained by this pipeline. */ PUBSUB, + /** + * Write to a Kafka topic. It will be drained by this pipeline. + */ + KAFKA, /** * Write to a text file. Only works in batch mode. */ @@ -129,7 +137,7 @@ /** * Write raw Events to BigQuery. */ - BIGQUERY, + BIGQUERY } /** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 89403) Time Spent: 3h 40m (was: 3.5h) > Add support for Kafka as source/sink on Nexmark > ----------------------------------------------- > > Key: BEAM-2852 > URL: https://issues.apache.org/jira/browse/BEAM-2852 > Project: Beam > Issue Type: Improvement > Components: testing > Reporter: Ismaël Mejía > Assignee: Alexey Romanenko > Priority: Minor > Labels: newbie, nexmark, starter > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)