[ 
https://issues.apache.org/jira/browse/BEAM-5716?focusedWorklogId=159181&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159181
 ]

ASF GitHub Bot logged work on BEAM-5716:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Oct/18 12:53
            Start Date: 26/Oct/18 12:53
    Worklog Time Spent: 10m 
      Work Description: lgajowy closed pull request #6725: [BEAM-5716] 
Reorganize testing modules
URL: https://github.com/apache/beam/pull/6725
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/testing/OWNERS b/sdks/java/testing/OWNERS
new file mode 100644
index 00000000000..afa0fc252af
--- /dev/null
+++ b/sdks/java/testing/OWNERS
@@ -0,0 +1,6 @@
+# See the OWNERS docs at https://s.apache.org/beam-owners
+
+reviewers:
+  - echauchot
+  - aromanenko-dev
+  - lgajowy
diff --git a/sdks/java/load-tests/OWNERS b/sdks/java/testing/load-tests/OWNERS
similarity index 100%
rename from sdks/java/load-tests/OWNERS
rename to sdks/java/testing/load-tests/OWNERS
diff --git a/sdks/java/load-tests/build.gradle 
b/sdks/java/testing/load-tests/build.gradle
similarity index 97%
rename from sdks/java/load-tests/build.gradle
rename to sdks/java/testing/load-tests/build.gradle
index c3a1f632b42..198148c0968 100644
--- a/sdks/java/load-tests/build.gradle
+++ b/sdks/java/testing/load-tests/build.gradle
@@ -50,6 +50,7 @@ dependencies {
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-runners-direct-java", configuration: "shadow")
   shadow project(path: ":beam-sdks-java-io-synthetic", configuration: "shadow")
+  shadow project(path: ":beam-sdks-java-test-utils", configuration: "shadow")
 
   gradleRun project(path: project.path, configuration: "shadow")
   gradleRun project(path: runnerDependency, configuration: "shadow")
diff --git 
a/sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
similarity index 79%
rename from 
sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
rename to 
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
index 89ee44db401..48e595b0b59 100644
--- 
a/sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
@@ -25,16 +25,21 @@
 import java.io.IOException;
 import java.util.Optional;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
 import 
org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticSourceOptions;
 import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
 import org.apache.beam.sdk.io.synthetic.SyntheticStep;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -57,12 +62,12 @@
  * <p>To run it manually, use the following command:
  *
  * <pre>
- *    ./gradlew run -p sdks/java/load-tests -PloadTest.args='
+ *    ./gradlew :beam-sdks-java-load-tests:run -PloadTest.args='
  *      --fanout=1
  *      --iterations=1
  *      --sourceOptions={"numRecords":1000,...}
  *      --stepOptions={"outputRecordsPerInputRecord":2...}'
- *  </pre>
+ * </pre>
  */
 public class GroupByKeyLoadTest {
 
@@ -120,13 +125,28 @@ public static void main(String[] args) throws IOException 
{
 
     for (int branch = 0; branch < options.getFanout(); branch++) {
       applySyntheticStep(input, branch, syntheticStep)
+          .apply(ParDo.of(new Monitor()))
           .apply(format("Group by key (%s)", branch), GroupByKey.create())
           .apply(
               format("Ungroup and reiterate (%s)", branch),
               ParDo.of(new UngroupAndReiterate(options.getIterations())));
     }
 
-    pipeline.run().waitUntilFinish();
+    PipelineResult result = pipeline.run();
+    result.waitUntilFinish();
+
+    printMetrics(result);
+  }
+
+  private static void printMetrics(PipelineResult result) {
+    MetricsReader resultMetrics = new MetricsReader(result, "gbk");
+
+    long totalBytes = resultMetrics.getCounterMetric("totalBytes.count", -1);
+    long startTime = 
resultMetrics.getStartTimeMetric(System.currentTimeMillis(), "runtime");
+    long endTime = resultMetrics.getEndTimeMetric(System.currentTimeMillis(), 
"runtime");
+
+    System.out.println(String.format("Total bytes: %s", totalBytes));
+    System.out.println(String.format("Total time (millis): %s", endTime - 
startTime));
   }
 
   private static PCollection<KV<byte[], byte[]>> applySyntheticStep(
@@ -172,4 +192,19 @@ public void processElement(ProcessContext c) {
       }
     }
   }
+
+  private static class Monitor extends DoFn<KV<byte[], byte[]>, KV<byte[], 
byte[]>> {
+
+    private static final String NAMESPACE = "gbk";
+
+    private final Distribution timeDistribution = 
Metrics.distribution(NAMESPACE, "runtime");
+    private final Counter totalBytes = Metrics.counter(NAMESPACE, 
"totalBytes.count");
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      timeDistribution.update(System.currentTimeMillis());
+      totalBytes.inc(c.element().getKey().length + 
c.element().getValue().length);
+      c.output(c.element());
+    }
+  }
 }
diff --git 
a/sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/package-info.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/package-info.java
similarity index 100%
rename from 
sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/package-info.java
rename to 
sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/package-info.java
diff --git a/sdks/java/nexmark/OWNERS b/sdks/java/testing/nexmark/OWNERS
similarity index 100%
rename from sdks/java/nexmark/OWNERS
rename to sdks/java/testing/nexmark/OWNERS
diff --git a/sdks/java/nexmark/build.gradle 
b/sdks/java/testing/nexmark/build.gradle
similarity index 98%
rename from sdks/java/nexmark/build.gradle
rename to sdks/java/testing/nexmark/build.gradle
index 5ee76179be7..328a16912db 100644
--- a/sdks/java/nexmark/build.gradle
+++ b/sdks/java/testing/nexmark/build.gradle
@@ -48,6 +48,7 @@ dependencies {
   shadow project(path: 
":beam-sdks-java-extensions-google-cloud-platform-core", configuration: 
"shadow")
   shadow project(path: ":beam-sdks-java-extensions-sql", configuration: 
"shadow")
   shadow project(path: ":beam-sdks-java-io-kafka", configuration: "shadow")
+  shadow project(path: ":beam-sdks-java-test-utils", configuration: "shadow")
   shadow library.java.google_api_services_bigquery
   shadow library.java.jackson_core
   shadow library.java.jackson_annotations
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
similarity index 90%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 99d79c1bb5c..ed1953f819e 100644
--- 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -32,7 +32,6 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
@@ -43,11 +42,6 @@
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.metrics.DistributionResult;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode;
 import org.apache.beam.sdk.nexmark.NexmarkUtils.SourceType;
 import org.apache.beam.sdk.nexmark.model.Auction;
@@ -88,6 +82,7 @@
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery5;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery7;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -173,79 +168,6 @@ private int maxNumWorkers() {
     return 5;
   }
 
-  /**
-   * Return the current value for a long counter, or a default value if can't 
be retrieved. Note
-   * this uses only attempted metrics because some runners don't support 
committed metrics.
-   */
-  private long getCounterMetric(
-      PipelineResult result, String namespace, String name, long defaultValue) 
{
-    MetricQueryResults metrics =
-        result
-            .metrics()
-            .queryMetrics(
-                MetricsFilter.builder()
-                    .addNameFilter(MetricNameFilter.named(namespace, name))
-                    .build());
-    Iterable<MetricResult<Long>> counters = metrics.getCounters();
-    try {
-      MetricResult<Long> metricResult = counters.iterator().next();
-      return metricResult.getAttempted();
-    } catch (NoSuchElementException e) {
-      LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
-    }
-    return defaultValue;
-  }
-
-  /**
-   * Return the current value for a long counter, or a default value if can't 
be retrieved. Note
-   * this uses only attempted metrics because some runners don't support 
committed metrics.
-   */
-  private long getDistributionMetric(
-      PipelineResult result,
-      String namespace,
-      String name,
-      DistributionType distType,
-      long defaultValue) {
-    MetricQueryResults metrics =
-        result
-            .metrics()
-            .queryMetrics(
-                MetricsFilter.builder()
-                    .addNameFilter(MetricNameFilter.named(namespace, name))
-                    .build());
-    Iterable<MetricResult<DistributionResult>> distributions = 
metrics.getDistributions();
-    try {
-      MetricResult<DistributionResult> distributionResult = 
distributions.iterator().next();
-      switch (distType) {
-        case MIN:
-          return distributionResult.getAttempted().getMin();
-        case MAX:
-          return distributionResult.getAttempted().getMax();
-        default:
-          return defaultValue;
-      }
-    } catch (NoSuchElementException e) {
-      LOG.error("Failed to get distribution metric {} for namespace {}", name, 
namespace);
-    }
-    return defaultValue;
-  }
-
-  private enum DistributionType {
-    MIN,
-    MAX
-  }
-
-  /** Return the current value for a time counter, or -1 if can't be 
retrieved. */
-  private long getTimestampMetric(long now, long value) {
-    // timestamp metrics are used to monitor time of execution of transforms.
-    // If result timestamp metric is too far from now, consider that metric is 
erroneous
-
-    if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
-      return -1;
-    }
-    return value;
-  }
-
   /**
    * Find a 'steady state' events/sec from {@code snapshots} and store it in 
{@code perf} if found.
    */
@@ -323,69 +245,22 @@ private NexmarkPerf currentPerf(
       Monitor<?> resultMonitor) {
     NexmarkPerf perf = new NexmarkPerf();
 
-    long numEvents =
-        getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + 
".elements", -1);
-    long numEventBytes =
-        getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + 
".bytes", -1);
-    long eventStart =
-        getTimestampMetric(
-            now,
-            getDistributionMetric(
-                result,
-                eventMonitor.name,
-                eventMonitor.prefix + ".startTime",
-                DistributionType.MIN,
-                -1));
-    long eventEnd =
-        getTimestampMetric(
-            now,
-            getDistributionMetric(
-                result,
-                eventMonitor.name,
-                eventMonitor.prefix + ".endTime",
-                DistributionType.MAX,
-                -1));
-
-    long numResults =
-        getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + 
".elements", -1);
-    long numResultBytes =
-        getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + 
".bytes", -1);
-    long resultStart =
-        getTimestampMetric(
-            now,
-            getDistributionMetric(
-                result,
-                resultMonitor.name,
-                resultMonitor.prefix + ".startTime",
-                DistributionType.MIN,
-                -1));
-    long resultEnd =
-        getTimestampMetric(
-            now,
-            getDistributionMetric(
-                result,
-                resultMonitor.name,
-                resultMonitor.prefix + ".endTime",
-                DistributionType.MAX,
-                -1));
+    MetricsReader eventMetrics = new MetricsReader(result, eventMonitor.name);
+
+    long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix + 
".elements", -1);
+    long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix + 
".bytes", -1);
+    long eventStart = eventMetrics.getStartTimeMetric(now, eventMonitor.prefix 
+ ".startTime");
+    long eventEnd = eventMetrics.getEndTimeMetric(now, eventMonitor.prefix + 
".endTime");
+
+    MetricsReader resultMetrics = new MetricsReader(result, 
resultMonitor.name);
+
+    long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix + 
".elements", -1);
+    long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix 
+ ".bytes", -1);
+    long resultStart = resultMetrics.getStartTimeMetric(now, 
resultMonitor.prefix + ".startTime");
+    long resultEnd = resultMetrics.getEndTimeMetric(now, resultMonitor.prefix 
+ ".endTime");
     long timestampStart =
-        getTimestampMetric(
-            now,
-            getDistributionMetric(
-                result,
-                resultMonitor.name,
-                resultMonitor.prefix + ".startTimestamp",
-                DistributionType.MIN,
-                -1));
-    long timestampEnd =
-        getTimestampMetric(
-            now,
-            getDistributionMetric(
-                result,
-                resultMonitor.name,
-                resultMonitor.prefix + ".endTimestamp",
-                DistributionType.MAX,
-                -1));
+        resultMetrics.getStartTimeMetric(now, resultMonitor.prefix + 
".startTimestamp");
+    long timestampEnd = resultMetrics.getEndTimeMetric(now, 
resultMonitor.prefix + ".endTimestamp");
 
     long effectiveEnd = -1;
     if (eventEnd >= 0 && resultEnd >= 0) {
@@ -569,7 +444,7 @@ private NexmarkPerf monitor(NexmarkQuery query) {
 
       if (options.isStreaming() && !waitingForShutdown) {
         Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
-        long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0);
+        long fatalCount = new MetricsReader(job, 
query.getName()).getCounterMetric("fatal", 0);
         if (fatalCount > 0) {
           NexmarkUtils.console("job has fatal errors, cancelling.");
           errors.add(String.format("Pipeline reported %s fatal errors", 
fatalCount));
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/RowSize.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/RowSize.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/RowSize.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/RowSize.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/SelectEvent.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/SelectEvent.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/SelectEvent.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/SelectEvent.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/package-info.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/package-info.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/package-info.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/adapter/package-info.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/package-info.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/package-info.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/package-info.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/package-info.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AuctionOrBid.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AuctionOrBid.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AuctionOrBid.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AuctionOrBid.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/package-info.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/package-info.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/package-info.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/package-info.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/package-info.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/package-info.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/package-info.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/package-info.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSource.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/Generator.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorCheckpoint.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/GeneratorConfig.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/AuctionGenerator.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/BidGenerator.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/LongGenerator.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PersonGenerator.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/PriceGenerator.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/StringsGenerator.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/model/package-info.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/generator/package-info.java
diff --git 
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/package-info.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/package-info.java
similarity index 100%
rename from 
sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/package-info.java
rename to 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/package-info.java
diff --git a/sdks/java/nexmark/src/main/resources/log4j.properties 
b/sdks/java/testing/nexmark/src/main/resources/log4j.properties
similarity index 100%
rename from sdks/java/nexmark/src/main/resources/log4j.properties
rename to sdks/java/testing/nexmark/src/main/resources/log4j.properties
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/RowSizeTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/RowSizeTest.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/RowSizeTest.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/RowSizeTest.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0Test.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0Test.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0Test.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0Test.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1Test.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1Test.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1Test.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1Test.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2Test.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2Test.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2Test.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2Test.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3Test.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3Test.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3Test.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3Test.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5Test.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5Test.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5Test.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5Test.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7Test.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7Test.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7Test.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7Test.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/BoundedEventSourceTest.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/GeneratorTest.java
diff --git 
a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
similarity index 100%
rename from 
sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
rename to 
sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java
diff --git a/sdks/java/testing/test-utils/OWNERS 
b/sdks/java/testing/test-utils/OWNERS
new file mode 100644
index 00000000000..2a140107225
--- /dev/null
+++ b/sdks/java/testing/test-utils/OWNERS
@@ -0,0 +1,5 @@
+# See the OWNERS docs at https://s.apache.org/beam-owners
+
+reviewers:
+  - lgajowy
+  - kkucharc
diff --git a/sdks/java/testing/test-utils/build.gradle 
b/sdks/java/testing/test-utils/build.gradle
new file mode 100644
index 00000000000..c76591f2141
--- /dev/null
+++ b/sdks/java/testing/test-utils/build.gradle
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: Test Utils"
+
+dependencies {
+  shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+
+  shadowTest library.java.junit
+  shadowTest library.java.mockito_core
+  shadowTest library.java.hamcrest_core
+  shadowTest library.java.hamcrest_library
+  shadowTest project(path: ":beam-runners-direct-java", configuration: 
"shadowTest")
+}
diff --git 
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
new file mode 100644
index 00000000000..77dccc03261
--- /dev/null
+++ 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/MetricsReader.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.metrics;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.joda.time.Duration;
+import org.slf4j.LoggerFactory;
+
+/** Provides methods for querying metrics from {@link PipelineResult} per 
namespace. */
+public class MetricsReader {
+
+  private static final org.slf4j.Logger LOG = 
LoggerFactory.getLogger(MetricsReader.class);
+
+  private enum DistributionType {
+    MIN,
+    MAX
+  }
+
+  private final PipelineResult result;
+
+  private final String namespace;
+
+  public MetricsReader(PipelineResult result, String namespace) {
+    this.result = result;
+    this.namespace = namespace;
+  }
+
+  /**
+   * Return the current value for a long counter, or a default value if can't 
be retrieved. Note
+   * this uses only attempted metrics because some runners don't support 
committed metrics.
+   */
+  public long getCounterMetric(String name, long defaultValue) {
+    MetricQueryResults metrics =
+        result
+            .metrics()
+            .queryMetrics(
+                MetricsFilter.builder()
+                    .addNameFilter(MetricNameFilter.named(namespace, name))
+                    .build());
+    Iterable<MetricResult<Long>> counters = metrics.getCounters();
+
+    checkIfMetricResultIsUnique(name, counters);
+
+    try {
+      MetricResult<Long> metricResult = counters.iterator().next();
+      return metricResult.getAttempted();
+    } catch (NoSuchElementException e) {
+      LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
+    }
+    return defaultValue;
+  }
+
+  /**
+   * Return start time metric by counting the difference between "now" and min 
value from a
+   * distribution metric.
+   */
+  public long getStartTimeMetric(long now, String name) {
+    return this.getTimestampMetric(now, this.getDistributionMetric(name, 
DistributionType.MIN, -1));
+  }
+
+  /**
+   * Return end time metric by counting the difference between "now" and MAX 
value from a
+   * distribution metric.
+   */
+  public long getEndTimeMetric(long now, String name) {
+    return this.getTimestampMetric(now, this.getDistributionMetric(name, 
DistributionType.MAX, -1));
+  }
+
+  /**
+   * Return the current value for a long counter, or a default value if can't 
be retrieved. Note
+   * this uses only attempted metrics because some runners don't support 
committed metrics.
+   */
+  private long getDistributionMetric(String name, DistributionType distType, 
long defaultValue) {
+    MetricQueryResults metrics =
+        result
+            .metrics()
+            .queryMetrics(
+                MetricsFilter.builder()
+                    .addNameFilter(MetricNameFilter.named(namespace, name))
+                    .build());
+    Iterable<MetricResult<DistributionResult>> distributions = 
metrics.getDistributions();
+
+    checkIfMetricResultIsUnique(name, distributions);
+
+    try {
+      MetricResult<DistributionResult> distributionResult = 
distributions.iterator().next();
+      switch (distType) {
+        case MIN:
+          return distributionResult.getAttempted().getMin();
+        case MAX:
+          return distributionResult.getAttempted().getMax();
+        default:
+          return defaultValue;
+      }
+    } catch (NoSuchElementException e) {
+      LOG.error("Failed to get distribution metric {} for namespace {}", name, 
namespace);
+    }
+    return defaultValue;
+  }
+
+  private <T> void checkIfMetricResultIsUnique(String name, 
Iterable<MetricResult<T>> metricResult)
+      throws IllegalStateException {
+
+    Preconditions.checkState(
+        Iterables.size(metricResult) == 1,
+        String.format("More than one metric matches name: %s in namespace 
%s.", name, namespace));
+  }
+
+  /** Return the current value for a time counter, or -1 if can't be 
retrieved. */
+  private long getTimestampMetric(long now, long value) {
+    // timestamp metrics are used to monitor time of execution of transforms.
+    // If result timestamp metric is too far from now, consider that metric is 
erroneous
+
+    if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) {
+      return -1;
+    }
+    return value;
+  }
+}
diff --git 
a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/package-info.java
 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/package-info.java
new file mode 100644
index 00000000000..c171bee8d3a
--- /dev/null
+++ 
b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/metrics/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Metrics related testing utilities for Java SDK. */
+package org.apache.beam.sdk.testutils.metrics;
diff --git 
a/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
 
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
new file mode 100644
index 00000000000..521317f2665
--- /dev/null
+++ 
b/sdks/java/testing/test-utils/src/test/java/org/apache/beam/sdk/testutils/metrics/MetricsReaderTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testutils.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link MetricsReader}. */
+@RunWith(JUnit4.class)
+public class MetricsReaderTest {
+
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static final String NAMESPACE = "Testing";
+
+  @Test
+  public void testCounterMetricReceivedFromPipelineResult() {
+    List<Integer> sampleInputData = Arrays.asList(1, 1, 1, 1, 1);
+
+    createTestPipeline(sampleInputData, new MonitorWithCounter());
+    PipelineResult result = testPipeline.run();
+
+    MetricsReader reader = new MetricsReader(result, NAMESPACE);
+
+    assertEquals(5, reader.getCounterMetric("counter", -1));
+  }
+
+  @Test
+  public void testStartTimeIsTheMinimumOfTheDistribution() {
+    List<Integer> sampleInputData = Arrays.asList(1, 2, 3, 4, 5);
+
+    createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
+    PipelineResult result = testPipeline.run();
+
+    MetricsReader reader = new MetricsReader(result, NAMESPACE);
+
+    assertEquals(1, reader.getStartTimeMetric(0, "timeDist"));
+  }
+
+  @Test
+  public void testEndTimeIsTheMaximumOfTheDistribution() {
+    List<Integer> sampleInputData = Arrays.asList(1, 2, 3, 4, 5);
+
+    createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
+
+    PipelineResult result = testPipeline.run();
+
+    MetricsReader reader = new MetricsReader(result, NAMESPACE);
+
+    assertEquals(5, reader.getEndTimeMetric(0, "timeDist"));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void 
throwsIllegalStateExceptionWhenThereAreMultipleCountersOfTheSameNameAndType() {
+    Metrics.counter(NAMESPACE, "counter");
+    Metrics.counter(NAMESPACE, "counter");
+
+    PipelineResult result = testPipeline.run();
+    MetricsReader reader = new MetricsReader(result, NAMESPACE);
+    reader.getCounterMetric("counter", -1);
+  }
+
+  @Test
+  public void testTimeIsMinusOneIfTimeMetricIsTooFarFromNow() {
+    List<Integer> sampleInputData = Arrays.asList(1, 5, 5, 5, 5);
+
+    createTestPipeline(sampleInputData, new MonitorWithTimeDistribution());
+    PipelineResult result = testPipeline.run();
+
+    MetricsReader reader = new MetricsReader(result, NAMESPACE);
+
+    assertEquals(-1, reader.getStartTimeMetric(900000000001L, "timeDist"));
+    assertEquals(-1, reader.getEndTimeMetric(900000000001L, "timeDist"));
+  }
+
+  private void createTestPipeline(List<Integer> sampleInputData, DoFn<Integer, 
Integer> monitor) {
+    testPipeline.apply(Create.of(sampleInputData)).apply(ParDo.of(monitor));
+  }
+
+  /** Counts total elements of the input data provided. */
+  private static class MonitorWithCounter extends DoFn<Integer, Integer> {
+    private final Counter elementCounter = Metrics.counter(NAMESPACE, 
"counter");
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      elementCounter.inc();
+    }
+  }
+
+  /** Simulates time flow by updating the distribution metric with input 
collection elements. */
+  private static class MonitorWithTimeDistribution extends DoFn<Integer, 
Integer> {
+    private final Distribution timeDistribution = 
Metrics.distribution(NAMESPACE, "timeDist");
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      timeDistribution.update(c.element().longValue());
+    }
+  }
+}
diff --git a/settings.gradle b/settings.gradle
index be73374a341..6f5152e1473 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -167,13 +167,13 @@ project(":beam-sdks-java-io-synthetic").dir = 
file("sdks/java/io/synthetic")
 include "beam-sdks-java-javadoc"
 project(":beam-sdks-java-javadoc").dir = file("sdks/java/javadoc")
 include "beam-sdks-java-load-tests"
-project(":beam-sdks-java-load-tests").dir = file("sdks/java/load-tests")
+project(":beam-sdks-java-load-tests").dir = 
file("sdks/java/testing/load-tests")
 include "beam-sdks-java-maven-archetypes-examples"
 project(":beam-sdks-java-maven-archetypes-examples").dir = 
file("sdks/java/maven-archetypes/examples")
 include "beam-sdks-java-maven-archetypes-starter"
 project(":beam-sdks-java-maven-archetypes-starter").dir = 
file("sdks/java/maven-archetypes/starter")
 include "beam-sdks-java-nexmark"
-project(":beam-sdks-java-nexmark").dir = file("sdks/java/nexmark")
+project(":beam-sdks-java-nexmark").dir = file("sdks/java/testing/nexmark")
 include "beam-sdks-python"
 project(":beam-sdks-python").dir = file("sdks/python")
 include "beam-sdks-python-container"
@@ -182,6 +182,8 @@ include "beam-sdks-python-container-py3"
 project(":beam-sdks-python-container-py3").dir = 
file("sdks/python/container/py3")
 include "beam-vendor-java-grpc-v1"
 project(":beam-vendor-java-grpc-v1").dir = file("vendor/java-grpc-v1")
+include "beam-sdks-java-test-utils"
+project(":beam-sdks-java-test-utils").dir = 
file("sdks/java/testing/test-utils")
 include "beam-vendor-sdks-java-extensions-protobuf"
 project(":beam-vendor-sdks-java-extensions-protobuf").dir = 
file("vendor/sdks-java-extensions-protobuf")
 include "beam-website"
diff --git a/website/src/_includes/section-menu/sdks.html 
b/website/src/_includes/section-menu/sdks.html
index 7f723ea302a..cc7f1529072 100644
--- a/website/src/_includes/section-menu/sdks.html
+++ b/website/src/_includes/section-menu/sdks.html
@@ -22,7 +22,7 @@
     </li>
     <li><a href="{{ site.baseurl }}/documentation/sdks/java-extensions/">Java 
SDK extensions</a></li>
     <li><a href="{{ site.baseurl }}/documentation/sdks/java-thirdparty/">Java 
3rd party extensions</a></li>
-    <li><a href="{{ site.baseurl }}/documentation/sdks/java/nexmark/">Nexmark 
benchmark suite</a></li>
+    <li><a href="{{ site.baseurl 
}}/documentation/sdks/java/testing/nexmark/">Nexmark benchmark suite</a></li>
   </ul>
 </li>
 
diff --git a/website/src/documentation/sdks/java.md 
b/website/src/documentation/sdks/java.md
index b0078083f0b..f01f1dd8645 100644
--- a/website/src/documentation/sdks/java.md
+++ b/website/src/documentation/sdks/java.md
@@ -45,7 +45,7 @@ The Java SDK has the following extensions:
 
 - 
[join-library]({{site.baseurl}}/documentation/sdks/java-extensions/#join-library)
 provides inner join, outer left join, and outer right join functions.
 - [sorter]({{site.baseurl}}/documentation/sdks/java-extensions/#sorter) is an 
efficient and scalable sorter for large iterables.
-- [Nexmark]({{site.baseurl}}/documentation/sdks/java/nexmark) is a benchmark 
suite that runs in batch and streaming modes.
+- [Nexmark]({{site.baseurl}}/documentation/sdks/java/testing/nexmark) is a 
benchmark suite that runs in batch and streaming modes.
 - [euphoria]({{site.baseurl}}/documentation/sdks/java/euphoria) is easy to use 
Java 8 DSL for BEAM.
 
 In addition several [3rd party Java 
libraries]({{site.baseurl}}/documentation/sdks/java-thirdparty/) exist.
diff --git a/website/src/documentation/sdks/nexmark.md 
b/website/src/documentation/sdks/nexmark.md
index 6dac601f686..bcba7279ef1 100644
--- a/website/src/documentation/sdks/nexmark.md
+++ b/website/src/documentation/sdks/nexmark.md
@@ -2,7 +2,7 @@
 layout: section
 title: "Nexmark benchmark suite"
 section_menu: section-menu/sdks.html
-permalink: /documentation/sdks/java/nexmark/
+permalink: /documentation/sdks/java/testing/nexmark/
 ---
 <!--
 Licensed under the Apache License, Version 2.0 (the "License");
@@ -79,7 +79,7 @@ We have augmented the original queries with five more:
 ## Benchmark workload configuration
 
 Here are some of the knobs of the benchmark workload (see
-[NexmarkConfiguration.java](https://github.com/apache/beam/blob/master/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java)).
+[NexmarkConfiguration.java](https://github.com/apache/beam/blob/master/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java)).
 
 These configuration items can be passed to the launch command line.
 
@@ -646,7 +646,7 @@ Submit to the cluster:
         --driver-memory 512m \
         --executor-memory 512m \
         --executor-cores 1 \
-        sdks/java/nexmark/build/libs/beam-sdks-java-nexmark-{{ 
site.release_latest }}-spark.jar \
+        sdks/java/testing/nexmark/build/libs/beam-sdks-java-nexmark-{{ 
site.release_latest }}-spark.jar \
             --runner=SparkRunner \
             --query=0 \
             --streamTimeout=60 \


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 159181)
    Time Spent: 3h  (was: 2h 50m)

> Move testing utilities to a common place
> ----------------------------------------
>
>                 Key: BEAM-5716
>                 URL: https://issues.apache.org/jira/browse/BEAM-5716
>             Project: Beam
>          Issue Type: Task
>          Components: testing
>            Reporter: Lukasz Gajowy
>            Assignee: Lukasz Gajowy
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> This was raised on the devlist here: 
> [https://lists.apache.org/thread.html/19412a35c06d41b378c7e0116557b6eccbd7c289d86998e8ab3fadcf@%3Cdev.beam.apache.org%3E]
> Short version: there are some testing utilities from nexmark module that 
> could be reused in load test (and possibly elsewhere). Other than that, both 
> Nexmark and Load-tests should be stored in a common module together with the 
> utils, as they all serve similar causes (testing).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to