[ 
https://issues.apache.org/jira/browse/BEAM-2852?focusedWorklogId=89403&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89403
 ]

ASF GitHub Bot logged work on BEAM-2852:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Apr/18 14:11
            Start Date: 10/Apr/18 14:11
    Worklog Time Spent: 10m 
      Work Description: echauchot closed pull request #5019: [BEAM-2852] Add 
support for Kafka as source/sink on Nexmark
URL: https://github.com/apache/beam/pull/5019
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/nexmark/build.gradle b/sdks/java/nexmark/build.gradle
index b8f354d7af5..83a3d05f99d 100644
--- a/sdks/java/nexmark/build.gradle
+++ b/sdks/java/nexmark/build.gradle
@@ -27,6 +27,7 @@ dependencies {
   shadow project(path: ":beam-sdks-java-io-google-cloud-platform", 
configuration: "shadow")
   shadow project(path: 
":beam-sdks-java-extensions-google-cloud-platform-core", configuration: 
"shadow")
   shadow project(path: ":beam-sdks-java-extensions-sql", configuration: 
"shadow")
+  shadow project(path: ":beam-sdks-java-io-kafka", configuration: "shadow")
   shadow library.java.google_api_services_bigquery
   shadow library.java.jackson_core
   shadow library.java.jackson_annotations
@@ -38,6 +39,7 @@ dependencies {
   shadow library.java.junit
   shadow library.java.hamcrest_core
   shadow library.java.commons_lang3
+  shadow library.java.kafka_clients
   shadow project(path: ":beam-runners-direct-java", configuration: "shadow")
   shadow library.java.slf4j_jdk14
   testCompile library.java.hamcrest_core
diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml
index 613ddc70a58..4d943751675 100644
--- a/sdks/java/nexmark/pom.xml
+++ b/sdks/java/nexmark/pom.xml
@@ -192,6 +192,15 @@
           <argLine>-da</argLine> <!-- disable assert in Calcite converter 
validation -->
         </configuration>
       </plugin>
+
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+                <source>1.8</source>
+                <target>1.8</target>
+            </configuration>
+        </plugin>
     </plugins>
   </build>
 
@@ -274,13 +283,13 @@
       <artifactId>hamcrest-core</artifactId>
       <scope>compile</scope>
     </dependency>
-    
+
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-library</artifactId>
       <scope>compile</scope>
     </dependency>
-    
+
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-direct-java</artifactId>
@@ -303,5 +312,23 @@
       <artifactId>commons-lang3</artifactId>
       <scope>runtime</scope>
     </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-kafka</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.clients.version}</version>
+    </dependency>
+
   </dependencies>
 </project>
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index bd746ce537a..cd24897664a 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.nexmark;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -42,6 +43,7 @@
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -88,11 +90,16 @@
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
 import org.slf4j.LoggerFactory;
 
@@ -764,6 +771,69 @@ public void processElement(ProcessContext c) {
       }));
   }
 
+  static final DoFn<Event, byte[]> EVENT_TO_BYTEARRAY =
+          new DoFn<Event, byte[]>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              try {
+                byte[] encodedEvent = 
CoderUtils.encodeToByteArray(Event.CODER, c.element());
+                c.output(encodedEvent);
+              } catch (CoderException e1) {
+                LOG.error("Error while sending Event {} to Kafka: 
serialization error",
+                        c.element().toString());
+              }
+            }
+          };
+
+  /**
+   * Send {@code events} to Kafka.
+   */
+  private void sinkEventsToKafka(PCollection<Event> events) {
+    PCollection<byte[]> eventToBytes =
+        events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY));
+    eventToBytes.apply(KafkaIO.<Void, byte[]>write()
+                    .withBootstrapServers(options.getBootstrapServers())
+                    .withTopic(options.getKafkaSinkTopic())
+                    .withValueSerializer(ByteArraySerializer.class)
+                    .values());
+
+  }
+
+
+  static final DoFn<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT =
+          new DoFn<KV<Long, byte[]>, Event>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              byte[] encodedEvent = c.element().getValue();
+              try {
+                Event event = CoderUtils.decodeFromByteArray(Event.CODER, 
encodedEvent);
+                c.output(event);
+              } catch (CoderException e) {
+                LOG.error("Error while decoding Event from Kafka message: 
serialization error");
+              }
+            }
+          };
+
+  /**
+   * Return source of events from Kafka.
+   */
+  private PCollection<Event> sourceEventsFromKafka(Pipeline p) {
+    NexmarkUtils.console("Reading events from Kafka Topic %s", 
options.getKafkaSourceTopic());
+
+    checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
+        "Missing --bootstrapServers");
+
+    KafkaIO.Read<Long, byte[]> read = KafkaIO.<Long, byte[]>read()
+            .withBootstrapServers(options.getBootstrapServers())
+            .withTopic(options.getKafkaSourceTopic())
+            .withKeyDeserializer(LongDeserializer.class)
+            .withValueDeserializer(ByteArrayDeserializer.class);
+
+    return p
+      .apply(queryName + ".ReadKafkaEvents", read.withoutMetadata())
+      .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
+  }
+
   /**
    * Return Avro source of events from {@code options.getInputFilePrefix}.
    */
@@ -813,6 +883,22 @@ public void processElement(ProcessContext c) {
         .apply(queryName + ".WritePubsubEvents", io);
   }
 
+  /**
+   * Send {@code formattedResults} to Kafka.
+   */
+  private void sinkResultsToKafka(PCollection<String> formattedResults) {
+    checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
+            "Missing --bootstrapServers");
+
+    formattedResults.apply(
+        queryName + ".WriteKafkaResults",
+        KafkaIO.<Void, String>write()
+            .withBootstrapServers(options.getBootstrapServers())
+            .withTopic(options.getKafkaSinkTopic())
+            .withValueSerializer(StringSerializer.class)
+            .values());
+  }
+
   /**
    * Send {@code formattedResults} to Pubsub.
    */
@@ -923,6 +1009,9 @@ private void sinkResultsToBigQuery(
       case AVRO:
         source = sourceEventsFromAvro(p);
         break;
+      case KAFKA:
+        source = sourceEventsFromKafka(p);
+        break;
       case PUBSUB:
         // Setup the sink for the publisher.
         switch (configuration.pubSubMode) {
@@ -1010,6 +1099,9 @@ private void 
sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
       case PUBSUB:
         sinkResultsToPubsub(formattedResults, now);
         break;
+      case KAFKA:
+        sinkResultsToKafka(formattedResults);
+        break;
       case TEXT:
         sinkResultsToText(formattedResults, now);
         break;
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index 0c5c1c1c368..a3386b66d2b 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -406,4 +406,24 @@
   String getQueryLanguage();
 
   void setQueryLanguage(String value);
+
+  @Description("Base name of Kafka source topic in streaming mode.")
+  @Nullable
+  @Default.String("nexmark-source")
+  String getKafkaSourceTopic();
+
+  void setKafkaSourceTopic(String value);
+
+  @Description("Base name of Kafka sink topic in streaming mode.")
+  @Nullable
+  @Default.String("nexmark-sink")
+  String getKafkaSinkTopic();
+
+  void setKafkaSinkTopic(String value);
+
+  @Description("Kafka Bootstrap Server domains.")
+  @Nullable
+  String getBootstrapServers();
+
+  void setBootstrapServers(String value);
 }
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index 3eb6f79c3ea..5d89dbde439 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -99,7 +99,11 @@
     /**
      * Read from a PubSub topic. It will be fed the same synthetic events by 
this pipeline.
      */
-    PUBSUB
+    PUBSUB,
+    /**
+     * Read events from a Kafka topic. It will be fed the same synthetic 
events by this pipeline.
+     */
+    KAFKA
   }
 
   /**
@@ -118,6 +122,10 @@
      * Write to a PubSub topic. It will be drained by this pipeline.
      */
     PUBSUB,
+    /**
+     * Write to a Kafka topic. It will be drained by this pipeline.
+     */
+    KAFKA,
     /**
      * Write to a text file. Only works in batch mode.
      */
@@ -129,7 +137,7 @@
     /**
      * Write raw Events to BigQuery.
      */
-    BIGQUERY,
+    BIGQUERY
   }
 
   /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 89403)
    Time Spent: 3h 40m  (was: 3.5h)

> Add support for Kafka as source/sink on Nexmark
> -----------------------------------------------
>
>                 Key: BEAM-2852
>                 URL: https://issues.apache.org/jira/browse/BEAM-2852
>             Project: Beam
>          Issue Type: Improvement
>          Components: testing
>            Reporter: Ismaël Mejía
>            Assignee: Alexey Romanenko
>            Priority: Minor
>              Labels: newbie, nexmark, starter
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to