Make TimestampTransform Serializable. Rebase leftover fixes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f47e0eba Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f47e0eba Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f47e0eba Branch: refs/heads/master Commit: f47e0ebacd4c915e2a8c8b4e18e792926745d673 Parents: 24ab605 Author: Sela <[email protected]> Authored: Mon Feb 20 19:45:03 2017 +0200 Committer: Sela <[email protected]> Committed: Wed Mar 1 00:18:04 2017 +0200 ---------------------------------------------------------------------- .../aggregators/AggregatorsAccumulator.java | 2 +- .../spark/metrics/MetricsAccumulator.java | 4 -- .../spark/metrics/SparkMetricsContainer.java | 5 +- .../spark/stateful/SparkStateInternals.java | 16 +++++++ .../streaming/StreamingTransformTranslator.java | 2 +- .../translation/streaming/UnboundedDataset.java | 1 - .../ResumeFromCheckpointStreamingTest.java | 48 ++++++++++++++++++-- .../windowing/TimestampTransform.java | 3 +- 8 files changed, 64 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java index a4dfda6..261c327 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; /** - * For resilience, {@link Accumulator}s are required to be wrapped in a Singleton. + * For resilience, {@link Accumulator Accumulators} are required to be wrapped in a Singleton. * @see <a href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a> */ public class AggregatorsAccumulator { http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java index f27a826..9d48289 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java @@ -53,9 +53,6 @@ public class MetricsAccumulator { if (instance == null) { synchronized (MetricsAccumulator.class) { if (instance == null) { - // TODO: currently when recovering from checkpoint, Spark does not recover the - // last known Accumulator value. The SparkRunner should be able to persist and recover - // the SparkMetricsContainer in order to recover metrics as well. SparkMetricsContainer initialValue = new SparkMetricsContainer(); instance = jsc.sc().accumulator(initialValue, "Beam.Metrics", new MetricsAccumulatorParam()); @@ -94,7 +91,6 @@ public class MetricsAccumulator { } } - @SuppressWarnings("unused") @VisibleForTesting static void clear() { synchronized (MetricsAccumulator.class) { http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java index 0bf0e70..7a4b222 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java @@ -145,10 +145,7 @@ public class SparkMetricsContainer implements Serializable { @VisibleForTesting public static void clear() { try { - SparkMetricsContainer instance = getInstance(); - instance.initializeMetricsContainers(); - instance.counters.clear(); - instance.distributions.clear(); + MetricsAccumulator.clear(); } catch (IllegalStateException ignored) { } } http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java index 93b1f63..43fb383 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java @@ -38,7 +38,9 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.SetState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; @@ -121,6 +123,20 @@ class SparkStateInternals<K> implements StateInternals<K> { } @Override + public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> spec, Coder<T> elemCoder) { + throw new UnsupportedOperationException( + String.format("%s is not supported", SetState.class.getSimpleName())); + } + + @Override + public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( + StateTag<? super K, MapState<KeyT, ValueT>> spec, + Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { + throw new UnsupportedOperationException( + String.format("%s is not supported", MapState.class.getSimpleName())); + } + + @Override public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index a98eff2..fc98781 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -37,9 +37,9 @@ import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.io.SparkUnboundedSource; -import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; +import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet; import org.apache.beam.runners.spark.translation.BoundedDataset; import org.apache.beam.runners.spark.translation.Dataset; import org.apache.beam.runners.spark.translation.DoFnFunction; http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java index d7f3f34..e9abe93 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.spark.translation.Dataset; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.streaming.api.java.JavaDStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index e307363..4eea383 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -17,15 +17,19 @@ */ package org.apache.beam.runners.spark.translation.streaming; +import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -33,17 +37,23 @@ import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineResult; import org.apache.beam.runners.spark.SparkRunner; -import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; +import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Keys; @@ -52,6 +62,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -59,6 +70,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -147,6 +159,11 @@ public class ResumeFromCheckpointStreamingTest { "k4", new Instant(400) )); + MetricsFilter metricsFilter = + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(ResumeFromCheckpointStreamingTest.class)) + .build(); + // first run will read from Kafka backlog - "auto.offset.reset=smallest" SparkPipelineResult res = run(options); res.waitUntilFinish(Duration.standardSeconds(2)); @@ -157,11 +174,15 @@ public class ResumeFromCheckpointStreamingTest { "Expected %d processed messages count but found %d", 4, processedMessages1), processedMessages1, equalTo(4L)); + assertThat(res.metrics().queryMetrics(metricsFilter).counters(), + hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(), + "allMessages", "EOFShallNotPassFn", 4L))); //--- between executions: //- clear state. - AccumulatorSingleton.clear(); + AggregatorsAccumulator.clear(); + SparkMetricsContainer.clear(); GlobalWatermarkHolder.clear(); //- write a bit more. @@ -175,11 +196,14 @@ public class ResumeFromCheckpointStreamingTest { res.waitUntilFinish(Duration.standardSeconds(2)); // assertions 2: long processedMessages2 = res.getAggregatorValue("processedMessages", Long.class); - int successAssertions = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); assertThat( - String.format("Expected %d processed messages count but found %d", 1, processedMessages2), + String.format("Expected %d processed messages count but found %d", 5, processedMessages2), processedMessages2, equalTo(5L)); + assertThat(res.metrics().queryMetrics(metricsFilter).counters(), + hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(), + "allMessages", "EOFShallNotPassFn", 6L))); + int successAssertions = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); assertThat( String.format( @@ -225,10 +249,14 @@ public class ResumeFromCheckpointStreamingTest { Pipeline p = Pipeline.create(options); + PCollection<String> expectedCol = + p.apply(Create.of(ImmutableList.of("side1", "side2")).withCoder(StringUtf8Coder.of())); + PCollectionView<List<String>> view = expectedCol.apply(View.<String>asList()); + PCollection<Iterable<String>> grouped = p .apply(read.withoutMetadata()) .apply(Keys.<String>create()) - .apply(ParDo.of(new EOFShallNotPassFn())) + .apply("EOFShallNotPassFn", ParDo.of(new EOFShallNotPassFn(view)).withSideInputs(view)) .apply(Window.<String>into(FixedWindows.of(Duration.millis(500))) .triggering(AfterWatermark.pastEndOfWindow()) .accumulatingFiredPanes() @@ -250,12 +278,22 @@ public class ResumeFromCheckpointStreamingTest { /** A pass-through fn that prevents EOF event from passing. */ private static class EOFShallNotPassFn extends DoFn<String, String> { + final PCollectionView<List<String>> view; private final Aggregator<Long, Long> aggregator = createAggregator("processedMessages", Sum.ofLongs()); + Counter counter = + Metrics.counter(ResumeFromCheckpointStreamingTest.class, "allMessages"); + + private EOFShallNotPassFn(PCollectionView<List<String>> view) { + this.view = view; + } @ProcessElement public void process(ProcessContext c) { String element = c.element(); + // assert that side input is passed correctly before/after resuming from checkpoint. + assertThat(c.sideInput(view), containsInAnyOrder("side1", "side2")); + counter.inc(); if (!element.equals("EOF")) { aggregator.addValue(1L); c.output(c.element()); http://git-wip-us.apache.org/repos/asf/beam/blob/f47e0eba/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java index b16e968..5318592 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java @@ -18,11 +18,12 @@ package org.apache.beam.sdk.transforms.windowing; import com.google.auto.value.AutoValue; +import java.io.Serializable; import org.joda.time.Duration; import org.joda.time.Instant; /** An abstract description of a standardized transformation on timestamps. */ -public abstract class TimestampTransform { +public abstract class TimestampTransform implements Serializable{ /** Returns a transform that shifts a timestamp later by {@code delay}. */ public static TimestampTransform delay(Duration delay) {
