[ 
https://issues.apache.org/jira/browse/BEAM-5961?focusedWorklogId=162657&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162657
 ]

ASF GitHub Bot logged work on BEAM-5961:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Nov/18 18:26
            Start Date: 05/Nov/18 18:26
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #6933: [BEAM-5961] 
Nexmark rollforward new query with test of Nexmark main
URL: https://github.com/apache/beam/pull/6933
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index 3ca6a3562ac..fcccf8d2431 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -97,8 +97,7 @@ private Result(NexmarkConfiguration configuration, 
NexmarkPerf perf) {
     private final NexmarkLauncher<NexmarkOptions> nexmarkLauncher;
     private final NexmarkConfiguration configuration;
 
-    private Run(String[] args, NexmarkConfiguration configuration) {
-      NexmarkOptions options = 
PipelineOptionsFactory.fromArgs(args).as(NexmarkOptions.class);
+    private Run(NexmarkOptions options, NexmarkConfiguration configuration) {
       this.nexmarkLauncher = new NexmarkLauncher<>(options);
       this.configuration = configuration;
     }
@@ -112,9 +111,13 @@ public Result call() throws IOException {
 
   /** Entry point. */
   void runAll(String[] args) throws IOException {
-    Instant start = Instant.now();
     NexmarkOptions options =
         
PipelineOptionsFactory.fromArgs(args).withValidation().as(NexmarkOptions.class);
+    runAll(options);
+  }
+
+  void runAll(NexmarkOptions options) throws IOException {
+    Instant start = Instant.now();
     Map<NexmarkConfiguration, NexmarkPerf> baseline = 
loadBaseline(options.getBaselineFilename());
     Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
     Set<NexmarkConfiguration> configurations = 
options.getSuite().getConfigurations(options);
@@ -126,7 +129,7 @@ void runAll(String[] args) throws IOException {
     try {
       // Schedule all the configurations.
       for (NexmarkConfiguration configuration : configurations) {
-        completion.submit(new Run(args, configuration));
+        completion.submit(new Run(options, configuration));
       }
 
       // Collect all the results.
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
index 43b5a16c743..bcb097e53b6 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
@@ -56,6 +56,21 @@
    */
   @JsonProperty public NexmarkUtils.PubSubMode pubSubMode = 
NexmarkUtils.PubSubMode.COMBINED;
 
+  /** The type of side input to use. */
+  @JsonProperty public NexmarkUtils.SideInputType sideInputType = 
NexmarkUtils.SideInputType.DIRECT;
+
+  /** Specify the number of rows to write to the side input. */
+  @JsonProperty public int sideInputRowCount = 500;
+
+  /** Specify the number of shards to write to the side input. */
+  @JsonProperty public int sideInputNumShards = 3;
+
+  /**
+   * Specify a prefix URL for side input files, which will be created for use 
queries that join the
+   * stream to static enrichment data.
+   */
+  @JsonProperty public String sideInputUrl = null;
+
   /**
    * Number of events to generate. If zero, generate as many as possible 
without overflowing
    * internal counters etc.
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 7d29834f806..7a60e4d235d 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -50,6 +50,8 @@
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.nexmark.queries.BoundedSideInputJoin;
+import org.apache.beam.sdk.nexmark.queries.BoundedSideInputJoinModel;
 import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
 import org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel;
 import org.apache.beam.sdk.nexmark.queries.Query0;
@@ -1110,6 +1112,10 @@ public NexmarkPerf run(NexmarkConfiguration 
runConfiguration) throws IOException
       // Generate events.
       PCollection<Event> source = createSource(p, now);
 
+      if (query.needsSideInput()) {
+        query.setSideInput(NexmarkUtils.prepareSideInput(p, configuration));
+      }
+
       if (options.getLogEvents()) {
         source = source.apply(queryName + ".Events.Log", 
NexmarkUtils.log(queryName + ".Events"));
       }
@@ -1199,6 +1205,7 @@ private NexmarkQuery getNexmarkQuery() {
         .put(NexmarkQueryName.HIGHEST_BID, new Query7Model(configuration))
         .put(NexmarkQueryName.MONITOR_NEW_USERS, new 
Query8Model(configuration))
         .put(NexmarkQueryName.WINNING_BIDS, new Query9Model(configuration))
+        .put(NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN, new 
BoundedSideInputJoinModel(configuration))
         .build();
   }
 
@@ -1242,6 +1249,7 @@ private NexmarkQuery getNexmarkQuery() {
         .put(NexmarkQueryName.LOG_TO_SHARDED_FILES, new Query10(configuration))
         .put(NexmarkQueryName.USER_SESSIONS, new Query11(configuration))
         .put(NexmarkQueryName.PROCESSING_TIME_WINDOWS, new 
Query12(configuration))
+        .put(NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN, new 
BoundedSideInputJoin(configuration))
         .build();
   }
 
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
index 462497b8274..ca21439d56c 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
@@ -39,7 +39,10 @@
   WINNING_BIDS(9), // Query "9"
   LOG_TO_SHARDED_FILES(10), // Query "10"
   USER_SESSIONS(11), // Query "11"
-  PROCESSING_TIME_WINDOWS(12); // Query "12"
+  PROCESSING_TIME_WINDOWS(12), // Query "12"
+
+  // Other non-numbered queries
+  BOUNDED_SIDE_INPUT_JOIN;
 
   private @Nullable Integer number;
 
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index cd01cca3149..72dccc4df34 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -17,14 +17,20 @@
  */
 package org.apache.beam.sdk.nexmark;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -33,7 +39,10 @@
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.nexmark.model.Auction;
@@ -58,9 +67,12 @@
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -123,6 +135,14 @@
     COMBINED
   }
 
+  /** Possible side input sources. */
+  public enum SideInputType {
+    /** Produce the side input via {@link Create}. */
+    DIRECT,
+    /** Read side input from CSV files. */
+    CSV
+  }
+
   /** Coder strategies. */
   public enum CoderStrategy {
     /** Hand-written. */
@@ -611,6 +631,107 @@ public void processElement(ProcessContext c) {
         });
   }
 
+  private static class GenerateSideInputData
+      extends PTransform<PBegin, PCollection<KV<Long, String>>> {
+
+    private final NexmarkConfiguration config;
+
+    private GenerateSideInputData(NexmarkConfiguration config) {
+      this.config = config;
+    }
+
+    @Override
+    public PCollection<KV<Long, String>> expand(PBegin input) {
+      return input
+          .apply(GenerateSequence.from(0).to(config.sideInputRowCount))
+          .apply(
+              MapElements.via(
+                  new SimpleFunction<Long, KV<Long, String>>() {
+                    @Override
+                    public KV<Long, String> apply(Long input) {
+                      return KV.of(input, String.valueOf(input));
+                    }
+                  }));
+    }
+  }
+
+  /**
+   * Write data to be read as a side input.
+   *
+   * <p>Contains pairs of a number and its string representation to model 
lookups of some enrichment
+   * data by id.
+   *
+   * <p>Generated data covers the range {@code [0, sideInputRowCount)} so 
lookup joins on any
+   * desired id field can be modeled by looking up {@code id % 
sideInputRowCount}.
+   */
+  public static PCollection<KV<Long, String>> prepareSideInput(
+      Pipeline queryPipeline, NexmarkConfiguration config) {
+
+    checkArgument(
+        config.sideInputRowCount > 0, "Side input required but 
sideInputRowCount is not >0");
+
+    PTransform<PBegin, PCollection<KV<Long, String>>> generateSideInputData =
+        new GenerateSideInputData(config);
+
+    switch (config.sideInputType) {
+      case DIRECT:
+        return queryPipeline.apply(generateSideInputData);
+      case CSV:
+        checkArgument(
+            config.sideInputUrl != null,
+            "Side input type %s requires a URL but sideInputUrl not specified",
+            SideInputType.CSV.toString());
+
+        checkArgument(
+            config.sideInputNumShards > 0,
+            "Side input type %s requires explicit numShards but 
sideInputNumShards not specified",
+            SideInputType.CSV.toString());
+
+        Pipeline tempPipeline = Pipeline.create();
+        tempPipeline
+            .apply(generateSideInputData)
+            .apply(
+                MapElements.via(
+                    new SimpleFunction<KV<Long, String>, String>(
+                        kv -> String.format("%s,%s", kv.getKey(), 
kv.getValue())) {}))
+            
.apply(TextIO.write().withNumShards(config.sideInputNumShards).to(config.sideInputUrl));
+        tempPipeline.run().waitUntilFinish();
+
+        return queryPipeline
+            .apply(TextIO.read().from(config.sideInputUrl + "*"))
+            .apply(
+                MapElements.via(
+                    new SimpleFunction<String, KV<Long, String>>(
+                        line -> {
+                          List<String> cols = 
ImmutableList.copyOf(Splitter.on(",").split(line));
+                          return KV.of(Long.valueOf(cols.get(0)), cols.get(1));
+                        }) {}));
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown type of side input requested: %s", 
config.sideInputType));
+    }
+  }
+
+  /** Frees any resources used to make the side input available. */
+  public static void cleanUpSideInput(NexmarkConfiguration config) throws 
IOException {
+    switch (config.sideInputType) {
+      case DIRECT:
+        break;
+      case CSV:
+        FileSystems.delete(
+            FileSystems.match(config.sideInputUrl + "*")
+                .metadata()
+                .stream()
+                .map(metadata -> metadata.resourceId())
+                .collect(Collectors.toList()));
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown type of %s clean up requested", 
SideInputType.class.getSimpleName()));
+    }
+  }
+
   /**
    * A coder for instances of {@code T} cast up to {@link KnownSize}.
    *
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
new file mode 100644
index 00000000000..cb9b9ab199d
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Map;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Query that joins a stream to a bounded side input, modeling basic stream 
enrichment.
+ *
+ * <pre>
+ * SELECT bid.*, sideInput.*
+ * FROM bid, sideInput
+ * WHERE bid.id = sideInput.id
+ * </pre>
+ */
+public class BoundedSideInputJoin extends NexmarkQuery {
+  public BoundedSideInputJoin(NexmarkConfiguration configuration) {
+    super(configuration, "JoinToFiles");
+  }
+
+  @Override
+  public boolean needsSideInput() {
+    return true;
+  }
+
+  private PCollection<Bid> applyTyped(PCollection<Event> events) {
+
+    checkState(getSideInput() != null, "Configuration error: side input is 
null");
+
+    final PCollectionView<Map<Long, String>> sideInputMap = 
getSideInput().apply(View.asMap());
+
+    return events
+        // Only want the bid events; easier to fake some side input data
+        .apply(JUST_BIDS)
+
+        // Map the conversion function over all bids.
+        .apply(
+            name + ".JoinToFiles",
+            ParDo.of(
+                    new DoFn<Bid, Bid>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        Bid bid = c.element();
+                        c.output(
+                            new Bid(
+                                bid.auction,
+                                bid.bidder,
+                                bid.price,
+                                bid.dateTime,
+                                c.sideInput(sideInputMap)
+                                    .get(bid.bidder % 
configuration.sideInputRowCount)));
+                      }
+                    })
+                .withSideInputs(sideInputMap));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
new file mode 100644
index 00000000000..822f343244b
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/** A direct implementation of {@link BoundedSideInputJoin}. */
+public class BoundedSideInputJoinModel extends NexmarkQueryModel {
+
+  /** Simulator for query 0. */
+  private static class Simulator extends AbstractSimulator<Event, Bid> {
+    private final NexmarkConfiguration configuration;
+
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+      this.configuration = configuration;
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non-bid events.
+        return;
+      }
+
+      // Join to the side input is always a string representation of the id 
being looked up
+      Bid bid = event.bid;
+      Bid resultBid =
+          new Bid(
+              bid.auction,
+              bid.bidder,
+              bid.price,
+              bid.dateTime,
+              String.valueOf(bid.bidder % configuration.sideInputRowCount));
+      TimestampedValue<Bid> result =
+          TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
+      addResult(result);
+    }
+  }
+
+  public BoundedSideInputJoinModel(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+    return toValueTimestamp(itr);
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
index 5123342e93a..b47a0f12d35 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.nexmark.queries;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.nexmark.Monitor;
@@ -198,6 +199,7 @@ public void processElement(ProcessContext c) {
   public final Monitor<KnownSize> resultMonitor;
   private final Monitor<Event> endOfStreamMonitor;
   private final Counter fatalCounter;
+  private transient PCollection<KV<Long, String>> sideInput = null;
 
   protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
     super(name);
@@ -218,6 +220,26 @@ protected NexmarkQuery(NexmarkConfiguration configuration, 
String name) {
   /** Implement the actual query. All we know about the result is it has a 
known encoded size. */
   protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> 
events);
 
+  /** Whether this query expects a side input to be populated. Defaults to 
{@code false}. */
+  public boolean needsSideInput() {
+    return false;
+  }
+
+  /**
+   * Set the side input for the query.
+   *
+   * <p>Note that due to the nature of side inputs, this instance of the query 
is now fixed and can
+   * only be safely applied in the pipeline where the side input was created.
+   */
+  public void setSideInput(PCollection<KV<Long, String>> sideInput) {
+    this.sideInput = sideInput;
+  }
+
+  /** Get the side input, if any. */
+  public @Nullable PCollection<KV<Long, String>> getSideInput() {
+    return sideInput;
+  }
+
   @Override
   public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> 
events) {
 
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/MainTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/MainTest.java
new file mode 100644
index 00000000000..23211d30fa4
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/MainTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+/**
+ * Test of {@link Main}; using {@link NexmarkOptions} since we don't want to 
test option parsing
+ * here.
+ */
+public class MainTest {
+  @Test
+  public void testSmokeSuiteOnDirectRunner() throws Exception {
+    NexmarkOptions options = 
PipelineOptionsFactory.create().as(NexmarkOptions.class);
+    // Default for SMOKE is 100k or 10k for heavier queries - way overkill for 
"smoke" test
+    options.setNumEvents(500L);
+    options.setSuite(NexmarkSuite.SMOKE);
+    options.setManageResources(false);
+    new Main().runAll(options);
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
new file mode 100644
index 00000000000..975a595d33c
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test the various NEXMark queries yield results coherent with their models. 
*/
+@RunWith(JUnit4.class)
+public class NexmarkUtilsTest {
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void testPrepareCsvSideInput() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.CSV;
+    ResourceId sideInputResourceId =
+        FileSystems.matchNewResource(
+            String.format(
+                "%s/JoinToFiles-%s",
+                pipeline.getOptions().getTempLocation(), new 
Random().nextInt()),
+            false);
+    config.sideInputUrl = sideInputResourceId.toString();
+    config.sideInputRowCount = 10000;
+    config.sideInputNumShards = 15;
+
+    PCollection<KV<Long, String>> sideInput = 
NexmarkUtils.prepareSideInput(pipeline, config);
+    try {
+      PAssert.that(sideInput)
+          .containsInAnyOrder(
+              LongStream.range(0, config.sideInputRowCount)
+                  .boxed()
+                  .map(l -> KV.of(l, l.toString()))
+                  .collect(Collectors.toList()));
+      pipeline.run();
+    } finally {
+      NexmarkUtils.cleanUpSideInput(config);
+    }
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
new file mode 100644
index 00000000000..da58d24707c
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.util.Random;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test the various NEXMark queries yield results coherent with their models. 
*/
+@RunWith(JUnit4.class)
+public class BoundedSideInputJoinTest {
+
+  @Rule public TestPipeline p = TestPipeline.create();
+
+  @Before
+  public void setupPipeline() {
+    NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
+  }
+
+  /** Test {@code query} matches {@code model}. */
+  private void queryMatchesModel(
+      String name,
+      NexmarkConfiguration config,
+      NexmarkQuery query,
+      NexmarkQueryModel model,
+      boolean streamingMode)
+      throws Exception {
+
+    ResourceId sideInputResourceId =
+        FileSystems.matchNewResource(
+            String.format(
+                "%s/BoundedSideInputJoin-%s",
+                p.getOptions().getTempLocation(), new Random().nextInt()),
+            false);
+    config.sideInputUrl = sideInputResourceId.toString();
+
+    try {
+      PCollection<KV<Long, String>> sideInput = 
NexmarkUtils.prepareSideInput(p, config);
+      query.setSideInput(sideInput);
+
+      PCollection<TimestampedValue<KnownSize>> results;
+      if (streamingMode) {
+        results =
+            p.apply(name + ".ReadUnBounded", 
NexmarkUtils.streamEventsSource(config)).apply(query);
+      } else {
+        results =
+            p.apply(name + ".ReadBounded", 
NexmarkUtils.batchEventsSource(config)).apply(query);
+      }
+      PAssert.that(results).satisfies(model.assertionFor());
+      PipelineResult result = p.run();
+      result.waitUntilFinish();
+    } finally {
+      NexmarkUtils.cleanUpSideInput(config);
+    }
+  }
+
+  /**
+   * A smoke test that the count of input bids and outputs are the same, to 
help diagnose flakiness
+   * in more complex tests.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void inputOutputSameEvents() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.DIRECT;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+    PCollection<KV<Long, String>> sideInput = NexmarkUtils.prepareSideInput(p, 
config);
+
+    try {
+      PCollection<Event> input = 
p.apply(NexmarkUtils.batchEventsSource(config));
+      PCollection<Bid> justBids = input.apply(NexmarkQuery.JUST_BIDS);
+      PCollection<Long> bidCount = justBids.apply("Count Bids", 
Count.globally());
+
+      NexmarkQuery query = new BoundedSideInputJoin(config);
+      query.setSideInput(sideInput);
+
+      PCollection<TimestampedValue<KnownSize>> output = input.apply(query);
+      PCollection<Long> outputCount = output.apply("Count outputs", 
Count.globally());
+
+      
PAssert.that(PCollectionList.of(bidCount).and(outputCount).apply(Flatten.pCollections()))
+          .satisfies(
+              counts -> {
+                assertThat(Iterables.size(counts), equalTo(2));
+                assertThat(Iterables.get(counts, 0), greaterThan(0L));
+                assertThat(Iterables.get(counts, 0), 
equalTo(Iterables.get(counts, 1)));
+                return null;
+              });
+      p.run();
+    } finally {
+      NexmarkUtils.cleanUpSideInput(config);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void queryMatchesModelBatchDirect() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.DIRECT;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+
+    queryMatchesModel(
+        "BoundedSideInputJoinTestBatch",
+        config,
+        new BoundedSideInputJoin(config),
+        new BoundedSideInputJoinModel(config),
+        false);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void queryMatchesModelStreamingDirect() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.DIRECT;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+    queryMatchesModel(
+        "BoundedSideInputJoinTestStreaming",
+        config,
+        new BoundedSideInputJoin(config),
+        new BoundedSideInputJoinModel(config),
+        true);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void queryMatchesModelBatchCsv() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.CSV;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+
+    queryMatchesModel(
+        "BoundedSideInputJoinTestBatch",
+        config,
+        new BoundedSideInputJoin(config),
+        new BoundedSideInputJoinModel(config),
+        false);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void queryMatchesModelStreamingCsv() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.CSV;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+    queryMatchesModel(
+        "BoundedSideInputJoinTestStreaming",
+        config,
+        new BoundedSideInputJoin(config),
+        new BoundedSideInputJoinModel(config),
+        true);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 162657)
    Time Spent: 50m  (was: 40m)

> No precommit coverage for Nexmark postcommit main entry point
> -------------------------------------------------------------
>
>                 Key: BEAM-5961
>                 URL: https://issues.apache.org/jira/browse/BEAM-5961
>             Project: Beam
>          Issue Type: Bug
>          Components: examples-nexmark
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> There's a decent amount of logic in Nexmark's {{Main}} class and 
> {{NexmarkLauncher}}, neither of which have any tests. It is extremely easy to 
> make a change that breaks the Nexmark postcommits with no signal before a PR 
> is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to