[ 
https://issues.apache.org/jira/browse/BEAM-4048?focusedWorklogId=101090&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101090
 ]

ASF GitHub Bot logged work on BEAM-4048:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/18 11:46
            Start Date: 11/May/18 11:46
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5249: [BEAM-4048] Refactor 
COMBINE mode for reading/writing from/to Pub/Sub and Kafka in Nexmark
URL: https://github.com/apache/beam/pull/5249
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index d2773894db1..ebc87054936 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -19,6 +19,8 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.COMBINED;
+import static 
org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.SUBSCRIBE_ONLY;
 
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
@@ -101,6 +103,7 @@
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -782,7 +785,7 @@ private String logsDir(long now) {
   /**
    * Return source of events from Pubsub.
    */
-  private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) {
+  private PCollection<Event> sourceEventsFromPubsub(Pipeline p) {
     NexmarkUtils.console("Reading events from Pubsub %s", pubsubSubscription);
 
     PubsubIO.Read<PubsubMessage> io =
@@ -809,14 +812,16 @@ public void processElement(ProcessContext c) throws 
IOException {
    * Send {@code events} to Kafka.
    */
   private void sinkEventsToKafka(PCollection<Event> events) {
-    PCollection<byte[]> eventToBytes =
-        events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY));
-    eventToBytes.apply(KafkaIO.<Void, byte[]>write()
-                    .withBootstrapServers(options.getBootstrapServers())
-                    .withTopic(options.getKafkaSinkTopic())
-                    .withValueSerializer(ByteArraySerializer.class)
-                    .values());
+    checkArgument((options.getBootstrapServers() != null), "Missing 
--bootstrapServers");
+    NexmarkUtils.console("Writing events to Kafka Topic %s", 
options.getKafkaTopic());
 
+    PCollection<byte[]> eventToBytes = events.apply("Event to bytes", 
ParDo.of(EVENT_TO_BYTEARRAY));
+    eventToBytes.apply(
+        KafkaIO.<Void, byte[]>write()
+            .withBootstrapServers(options.getBootstrapServers())
+            .withTopic(options.getKafkaTopic())
+            .withValueSerializer(ByteArraySerializer.class)
+            .values());
   }
 
   static final DoFn<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT =
@@ -832,21 +837,23 @@ public void processElement(ProcessContext c) throws 
IOException {
   /**
    * Return source of events from Kafka.
    */
-  private PCollection<Event> sourceEventsFromKafka(Pipeline p) {
-    NexmarkUtils.console("Reading events from Kafka Topic %s", 
options.getKafkaSourceTopic());
+  private PCollection<Event> sourceEventsFromKafka(Pipeline p, final Instant 
now) {
+    checkArgument((options.getBootstrapServers() != null), "Missing 
--bootstrapServers");
+    NexmarkUtils.console("Reading events from Kafka Topic %s", 
options.getKafkaTopic());
 
-    checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
-        "Missing --bootstrapServers");
-
-    KafkaIO.Read<Long, byte[]> read = KafkaIO.<Long, byte[]>read()
+    KafkaIO.Read<Long, byte[]> read =
+        KafkaIO.<Long, byte[]>read()
             .withBootstrapServers(options.getBootstrapServers())
-            .withTopic(options.getKafkaSourceTopic())
+            .withTopic(options.getKafkaTopic())
             .withKeyDeserializer(LongDeserializer.class)
-            .withValueDeserializer(ByteArrayDeserializer.class);
+            .withValueDeserializer(ByteArrayDeserializer.class)
+            .withStartReadTime(now)
+            .withMaxNumRecords(
+                options.getNumEvents() != null ? options.getNumEvents() : 
Long.MAX_VALUE);
 
     return p
-      .apply(queryName + ".ReadKafkaEvents", read.withoutMetadata())
-      .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
+        .apply(queryName + ".ReadKafkaEvents", read.withoutMetadata())
+        .apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
   }
 
   /**
@@ -887,14 +894,14 @@ private void sinkEventsToPubsub(PCollection<Event> 
events) {
    * Send {@code formattedResults} to Kafka.
    */
   private void sinkResultsToKafka(PCollection<String> formattedResults) {
-    checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
-            "Missing --bootstrapServers");
+    checkArgument((options.getBootstrapServers() != null), "Missing 
--bootstrapServers");
+    NexmarkUtils.console("Writing results to Kafka Topic %s", 
options.getKafkaResultsTopic());
 
     formattedResults.apply(
         queryName + ".WriteKafkaResults",
         KafkaIO.<Void, String>write()
             .withBootstrapServers(options.getBootstrapServers())
-            .withTopic(options.getKafkaSinkTopic())
+            .withTopic(options.getKafkaResultsTopic())
             .withValueSerializer(StringSerializer.class)
             .values());
   }
@@ -1027,7 +1034,7 @@ private void setupPubSubResources(long now) throws 
IOException {
    * Return source of events for this run, or null if we are simply publishing 
events
    * to Pubsub.
    */
-  private PCollection<Event> createSource(Pipeline p, final long now) throws 
IOException {
+  private PCollection<Event> createSource(Pipeline p, final Instant now) 
throws IOException {
     PCollection<Event> source = null;
 
     switch (configuration.sourceType) {
@@ -1038,33 +1045,45 @@ private void setupPubSubResources(long now) throws 
IOException {
         source = sourceEventsFromAvro(p);
         break;
       case KAFKA:
-        source = sourceEventsFromKafka(p);
-        break;
       case PUBSUB:
-        setupPubSubResources(now);
+        if (configuration.sourceType == SourceType.PUBSUB) {
+          setupPubSubResources(now.getMillis());
+        }
         // Setup the sink for the publisher.
         switch (configuration.pubSubMode) {
           case SUBSCRIBE_ONLY:
             // Nothing to publish.
             break;
           case PUBLISH_ONLY:
-            // Send synthesized events to Pubsub in this job.
-            sinkEventsToPubsub(
-                sourceEventsFromSynthetic(p)
-                    .apply(queryName + ".Snoop", 
NexmarkUtils.snoop(queryName)));
+            {
+              // Send synthesized events to Kafka or Pubsub in this job.
+              PCollection<Event> events =
+                  sourceEventsFromSynthetic(p)
+                      .apply(queryName + ".Snoop", 
NexmarkUtils.snoop(queryName));
+              if (configuration.sourceType == NexmarkUtils.SourceType.KAFKA) {
+                sinkEventsToKafka(events);
+              } else { // pubsub
+                sinkEventsToPubsub(events);
+              }
+            }
             break;
           case COMBINED:
-            // Send synthesized events to Pubsub in separate publisher job.
+            // Send synthesized events to Kafka or Pubsub in separate 
publisher job.
             // We won't start the main pipeline until the publisher has sent 
the pre-load events.
             // We'll shutdown the publisher job when we notice the main job 
has finished.
             invokeBuilderForPublishOnlyPipeline(
                 publishOnlyOptions -> {
-                  Pipeline sp = Pipeline.create(options);
+                  Pipeline sp = Pipeline.create(publishOnlyOptions);
                   NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
                   publisherMonitor = new Monitor<>(queryName, "publisher");
-                  sinkEventsToPubsub(
+                  PCollection<Event> events =
                       sourceEventsFromSynthetic(sp)
-                          .apply(queryName + ".Monitor", 
publisherMonitor.getTransform()));
+                          .apply(queryName + ".Monitor", 
publisherMonitor.getTransform());
+                  if (configuration.sourceType == 
NexmarkUtils.SourceType.KAFKA) {
+                    sinkEventsToKafka(events);
+                  } else { // pubsub
+                    sinkEventsToPubsub(events);
+                  }
                   publisherResult = sp.run();
                   NexmarkUtils.console("Publisher job is started.");
                 });
@@ -1078,8 +1097,21 @@ private void setupPubSubResources(long now) throws 
IOException {
             break;
           case SUBSCRIBE_ONLY:
           case COMBINED:
-            // Read events from pubsub.
-            source = sourceEventsFromPubsub(p, now);
+            {
+              // Read events from Kafka or Pubsub.
+              if (configuration.sourceType == NexmarkUtils.SourceType.KAFKA) {
+                // We need to have the same indexes for Publisher (sink) and 
Subscriber (source)
+                // pipelines in COMBINED mode (when we run them in sequence). 
It means that
+                // Subscriber should start reading from the same index as 
Publisher started to write
+                // pre-load events even if we run Subscriber right after 
Publisher has been
+                // finished. In other case. when pubSubMode=SUBSCRIBE_ONLY, 
now should be null and
+                // it will be ignored.
+                source =
+                    sourceEventsFromKafka(p, configuration.pubSubMode == 
COMBINED ? now : null);
+              } else {
+                source = sourceEventsFromPubsub(p);
+              }
+            }
             break;
         }
         break;
@@ -1213,7 +1245,7 @@ public NexmarkPerf run(NexmarkConfiguration 
runConfiguration) throws IOException
         return null;
       }
 
-      long now = System.currentTimeMillis();
+      final Instant now = Instant.now();
       Pipeline p = Pipeline.create(options);
       NexmarkUtils.setupPipeline(configuration.coderStrategy, p);
 
@@ -1238,7 +1270,7 @@ public NexmarkPerf run(NexmarkConfiguration 
runConfiguration) throws IOException
         if (configuration.query == 10) {
           String path = null;
           if (options.getOutputPath() != null && 
!options.getOutputPath().isEmpty()) {
-            path = logsDir(now);
+            path = logsDir(now.getMillis());
           }
           ((Query10) query).setOutputPath(path);
           ((Query10) query).setMaxNumWorkers(maxNumWorkers());
@@ -1259,7 +1291,7 @@ public NexmarkPerf run(NexmarkConfiguration 
runConfiguration) throws IOException
         }
 
         // Output results.
-        sink(results, now);
+        sink(results, now.getMillis());
       }
 
       mainResult = p.run();
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index ac894d213ba..14a91bd71a3 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ 
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -408,19 +408,19 @@
 
   void setQueryLanguage(String value);
 
-  @Description("Base name of Kafka source topic in streaming mode.")
+  @Description("Base name of Kafka events topic in streaming mode.")
   @Nullable
-  @Default.String("nexmark-source")
-  String getKafkaSourceTopic();
+  @Default.String("nexmark")
+  String getKafkaTopic();
 
-  void setKafkaSourceTopic(String value);
+  void setKafkaTopic(String value);
 
-  @Description("Base name of Kafka sink topic in streaming mode.")
+  @Description("Base name of Kafka results topic in streaming mode.")
   @Nullable
-  @Default.String("nexmark-sink")
-  String getKafkaSinkTopic();
+  @Default.String("nexmark-results")
+  String getKafkaResultsTopic();
 
-  void setKafkaSinkTopic(String value);
+  void setKafkaResultsTopic(String value);
 
   @Description("Kafka Bootstrap Server domains.")
   @Nullable


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101090)
    Time Spent: 3h 20m  (was: 3h 10m)

> Refactor COMBINE mode for reading and writing from/to Pub/Sub and Kafka in 
> Nexmark
> ----------------------------------------------------------------------------------
>
>                 Key: BEAM-4048
>                 URL: https://issues.apache.org/jira/browse/BEAM-4048
>             Project: Beam
>          Issue Type: Improvement
>          Components: examples-nexmark
>            Reporter: Etienne Chauchot
>            Assignee: Alexey Romanenko
>            Priority: Major
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to