http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java new file mode 100644 index 0000000..dab9518 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.tuple.Values; +import com.alibaba.jstorm.utils.KryoSerializer; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Spout implementation that wraps a Beam UnboundedSource. + * TODO: add wrapper to support metrics in UnboundedSource. + */ +public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); + + private final String description; + private final UnboundedSource source; + private final SerializedPipelineOptions serializedOptions; + private final TupleTag<?> outputTag; + + private transient JStormPipelineOptions pipelineOptions; + private transient UnboundedSource.UnboundedReader reader; + private transient SpoutOutputCollector collector; + + private volatile boolean hasNextRecord; + private AtomicBoolean activated = new AtomicBoolean(); + + private KryoSerializer<WindowedValue> serializer; + + private long lastWaterMark = 0L; + + public UnboundedSourceSpout( + String description, + UnboundedSource source, + JStormPipelineOptions options, + TupleTag<?> outputTag) { + this.description = checkNotNull(description, "description"); + this.source = checkNotNull(source, "source"); + this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); + this.outputTag = checkNotNull(outputTag, "outputTag"); + } + + @Override + public synchronized void close() { + try { + activated.set(false); + this.reader.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void activate() { + activated.set(true); + + } + + @Override + public void deactivate() { + activated.set(false); + } + + @Override + public void ack(Object msgId) { + throw new UnsupportedOperationException(); + } + + @Override + public void fail(Object msgId) { + throw new UnsupportedOperationException(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + try { + this.collector = collector; + this.pipelineOptions = + this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); + + createSourceReader(null); + + this.serializer = new KryoSerializer<>(conf); + } catch (IOException e) { + throw new RuntimeException("Unable to create unbounded reader.", e); + } + } + + public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException { + if (reader != null) { + reader.close(); + } + reader = this.source.createReader(this.pipelineOptions, checkpointMark); + hasNextRecord = this.reader.start(); + } + + @Override + public synchronized void nextTuple() { + if (!activated.get()) { + return; + } + try { + if (!hasNextRecord) { + hasNextRecord = reader.advance(); + } + + while (hasNextRecord && activated.get()) { + Object value = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + + WindowedValue wv = + WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + LOG.debug("Source output: " + wv.getValue()); + if (keyedEmit(outputTag.getId())) { + KV kv = (KV) wv.getValue(); + // Convert WindowedValue<KV> to <K, WindowedValue<V>> + byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue())); + collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue)); + } else { + byte[] immutableValue = serializer.serialize(wv); + collector.emit(outputTag.getId(), new Values(immutableValue)); + } + + // move to next record + hasNextRecord = reader.advance(); + } + + Instant waterMark = reader.getWatermark(); + if (waterMark != null && lastWaterMark < waterMark.getMillis()) { + lastWaterMark = waterMark.getMillis(); + collector.flush(); + collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis())); + LOG.debug("Source output: WM-{}", waterMark.toDateTime()); + } + } catch (IOException e) { + throw new RuntimeException("Exception reading values from source.", e); + } + } + + public UnboundedSource getUnboundedSource() { + return source; + } + + public UnboundedSource.UnboundedReader getUnboundedSourceReader() { + return reader; + } + + @Override + public String toString() { + return description; + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java new file mode 100644 index 0000000..54c9b94 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Translates a Read.Unbounded into a Storm spout. + * + * @param <T> + */ +class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> { + public void translateNode(Read.Unbounded<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = + describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + + TupleTag<?> tag = userGraphContext.getOutputTag(); + PValue output = userGraphContext.getOutput(); + + UnboundedSourceSpout spout = new UnboundedSourceSpout( + description, + transform.getSource(), userGraphContext.getOptions(), tag); + context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java new file mode 100644 index 0000000..822ed8a --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * JStorm {@link Executor} for {@link View}. + */ +class ViewExecutor implements Executor { + + private final String description; + private final TupleTag outputTag; + private ExecutorsBolt executorsBolt; + + public ViewExecutor(String description, TupleTag outputTag) { + this.description = description; + this.outputTag = outputTag; + } + + @Override + public void init(ExecutorContext context) { + this.executorsBolt = context.getExecutorsBolt(); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + executorsBolt.processExecutorElem(outputTag, elem); + } + + @Override + public void cleanup() { + } + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java new file mode 100644 index 0000000..9ab5784 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; + +/** + * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner. + */ +class ViewTranslator + extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> { + @Override + public void translateNode( + CreateJStormPCollectionView<?, ?> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = describeTransform( + transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag()); + context.addTransformExecutor(viewExecutor); + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}. + */ + public static class ViewAsMap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + + @SuppressWarnings("unused") // used via reflection in JstormRunner#apply() + public ViewAsMap(View.AsMap<K, V> transform) { + } + + @Override + public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, V>> view = + PCollectionViews.mapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + // TODO: log warning as other runners. + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMap"; + } + } + + /** + * Specialized expansion for {@link + * View.AsMultimap View.AsMultimap}. + */ + public static class ViewAsMultimap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsMultimap(View.AsMultimap<K, V> transform) { + } + + @Override + public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, Iterable<V>>> view = + PCollectionViews.multimapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + // TODO: log warning as other runners. + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMultimap"; + } + } + + /** + * Specialized implementation for + * {@link View.AsList View.AsList}. + */ + public static class ViewAsList<T> + extends PTransform<PCollection<T>, PCollectionView<List<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsList(View.AsList<T> transform) { + } + + @Override + public PCollectionView<List<T>> expand(PCollection<T> input) { + PCollectionView<List<T>> view = + PCollectionViews.listView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<T, List<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsList"; + } + } + + /** + * Specialized implementation for + * {@link View.AsIterable View.AsIterable} for the + * JStorm runner in streaming mode. + */ + public static class ViewAsIterable<T> + extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsIterable(View.AsIterable<T> transform) { + } + + @Override + public PCollectionView<Iterable<T>> expand(PCollection<T> input) { + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsIterable"; + } + } + + /** + * Specialized expansion for + * {@link View.AsSingleton View.AsSingleton} for the + * JStorm runner in streaming mode. + */ + public static class ViewAsSingleton<T> + extends PTransform<PCollection<T>, PCollectionView<T>> { + private View.AsSingleton<T> transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsSingleton(View.AsSingleton<T> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<T> expand(PCollection<T> input) { + Combine.Globally<T, T> combine = Combine.globally( + new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); + if (!transform.hasDefaultValue()) { + combine = combine.withoutDefaults(); + } + return input.apply(combine.asSingletonView()); + } + + @Override + protected String getKindString() { + return "StreamingViewAsSingleton"; + } + + private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { + private boolean hasDefaultValue; + private T defaultValue; + + SingletonCombine(boolean hasDefaultValue, T defaultValue) { + this.hasDefaultValue = hasDefaultValue; + this.defaultValue = defaultValue; + } + + @Override + public T apply(T left, T right) { + throw new IllegalArgumentException("PCollection with more than one element " + + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " + + "combine the PCollection into a single value"); + } + + @Override + public T identity() { + if (hasDefaultValue) { + return defaultValue; + } else { + throw new IllegalArgumentException( + "Empty PCollection accessed as a singleton view. " + + "Consider setting withDefault to provide a default value"); + } + } + } + } + + /** + * Specialized expansion for + * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}. + * @param <InputT> + * @param <OutputT> + */ + public static class CombineGloballyAsSingletonView<InputT, OutputT> + extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { + Combine.GloballyAsSingletonView<InputT, OutputT> transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public CombineGloballyAsSingletonView( + Combine.GloballyAsSingletonView<InputT, OutputT> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<OutputT> expand(PCollection<InputT> input) { + PCollection<OutputT> combined = + input.apply(Combine.globally(transform.getCombineFn()) + .withoutDefaults() + .withFanout(transform.getFanout())); + + PCollectionView<OutputT> view = PCollectionViews.singletonView( + combined, + combined.getWindowingStrategy(), + transform.getInsertDefault(), + transform.getInsertDefault() + ? transform.getCombineFn().defaultValue() : null, + combined.getCoder()); + return combined + .apply(ParDo.of(new WrapAsList<OutputT>())) + .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingCombineGloballyAsSingletonView"; + } + } + + private static class WrapAsList<T> extends DoFn<T, List<T>> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(Collections.singletonList(c.element())); + } + } + + /** + * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. + * They require the input {@link PCollection} fits in memory. + * For a large {@link PCollection} this is expected to crash! + * + * @param <T> the type of elements to concatenate. + */ + private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { + private static final long serialVersionUID = 1L; + + @Override + public List<T> createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List<T> addInput(List<T> accumulator, T input) { + accumulator.add(input); + return accumulator; + } + + @Override + public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { + List<T> result = createAccumulator(); + for (List<T> accumulator : accumulators) { + result.addAll(accumulator); + } + return result; + } + + @Override + public List<T> extractOutput(List<T> accumulator) { + return accumulator; + } + + @Override + public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + + @Override + public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + } + + /** + * Creates a primitive {@link PCollectionView}. + * For internal use only by runner implementors. + * + * @param <ElemT> The type of the elements of the input PCollection + * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input + */ + public static class CreateJStormPCollectionView<ElemT, ViewT> + extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { + private PCollectionView<ViewT> view; + + private CreateJStormPCollectionView(PCollectionView<ViewT> view) { + this.view = view; + } + + public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of( + PCollectionView<ViewT> view) { + return new CreateJStormPCollectionView<>(view); + } + + @Override + public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { + return view; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java new file mode 100644 index 0000000..8d60392 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}. + * @param <T> + * @param <W> + */ +class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor { + private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class); + + private final String description; + private WindowFn<T, W> windowFn; + private ExecutorsBolt executorsBolt; + private TupleTag outputTag; + + class JStormAssignContext<InputT, W extends BoundedWindow> + extends WindowFn<InputT, W>.AssignContext { + private final WindowedValue<InputT> value; + + JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { + fn.super(); + checkArgument( + Iterables.size(value.getWindows()) == 1, + String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); + this.value = value; + } + + @Override + public InputT element() { + return value.getValue(); + } + + @Override + public Instant timestamp() { + return value.getTimestamp(); + } + + @Override + public BoundedWindow window() { + return Iterables.getOnlyElement(value.getWindows()); + } + } + + public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) { + this.description = description; + this.windowFn = windowFn; + this.outputTag = outputTag; + } + + @Override + public void init(ExecutorContext context) { + this.executorsBolt = context.getExecutorsBolt(); + } + + @Override + public void process(TupleTag tag, WindowedValue elem) { + Collection<W> windows = null; + try { + windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem)); + for (W window : windows) { + executorsBolt.processExecutorElem( + outputTag, + WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane())); + } + } catch (Exception e) { + LOG.warn("Failed to assign windows for elem=" + elem, e); + } + } + + @Override + public void cleanup() { + } + + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java new file mode 100644 index 0000000..86cb638 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import org.apache.beam.sdk.transforms.windowing.Window; + +/** + * Translates a {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} to a + * JStorm {@link WindowAssignExecutor}. + * @param <T> + */ +class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { + + @Override + public void translateNode(Window.Assign<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = + describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + context.getUserGraphContext().setWindowed(); + WindowAssignExecutor executor = new WindowAssignExecutor( + description, + transform.getWindowFn(), + userGraphContext.getOutputTag()); + context.addTransformExecutor(executor); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java new file mode 100644 index 0000000..f8f2f3f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the translation to JStorm topology. + */ +package org.apache.beam.runners.jstorm.translation; http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java deleted file mode 100644 index 3d7fab8..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import backtype.storm.topology.IComponent; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.runners.jstorm.translation.util.CommonInstance; - -/** - * Enable user to add output stream definitions by API, rather than hard-code. - */ -public abstract class AbstractComponent implements IComponent { - private Map<String, Fields> streamToFields = new HashMap<>(); - private Map<String, Boolean> keyStreams = new HashMap<>(); - private int parallelismNum = 0; - - public void addOutputField(String streamId) { - addOutputField(streamId, new Fields(CommonInstance.VALUE)); - } - - public void addOutputField(String streamId, Fields fields) { - streamToFields.put(streamId, fields); - keyStreams.put(streamId, false); - } - - public void addKVOutputField(String streamId) { - streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE)); - keyStreams.put(streamId, true); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) { - declarer.declareStream(entry.getKey(), entry.getValue()); - } - } - - public boolean keyedEmit(String streamId) { - Boolean isKeyedStream = keyStreams.get(streamId); - return isKeyedStream == null ? false : isKeyedStream; - } - - public int getParallelismNum() { - return parallelismNum; - } - - public void setParallelismNum(int num) { - parallelismNum = num; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java deleted file mode 100644 index e07d890..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.alibaba.jstorm.metric.MetricClient; -import com.google.common.collect.Iterables; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.NullSideInputReader; -import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; -import org.apache.beam.runners.core.SideInputHandler; -import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.core.StateTags; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; -import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; -import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext; -import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * JStorm {@link Executor} for {@link DoFn}. - * @param <InputT> input type - * @param <OutputT> output type - */ -public class DoFnExecutor<InputT, OutputT> implements Executor { - private static final long serialVersionUID = 5297603063991078668L; - - private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class); - - /** - * Implements {@link OutputManager} in a DoFn executor. - */ - public class DoFnExecutorOutputManager implements OutputManager, Serializable { - private static final long serialVersionUID = -661113364735206170L; - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - executorsBolt.processExecutorElem(tag, output); - } - } - - protected transient DoFnRunner<InputT, OutputT> runner = null; - protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null; - - protected final String stepName; - - protected int internalDoFnExecutorId; - - protected final String description; - - protected final TupleTag<OutputT> mainTupleTag; - protected final List<TupleTag<?>> sideOutputTags; - - protected SerializedPipelineOptions serializedOptions; - protected transient JStormPipelineOptions pipelineOptions; - - protected DoFn<InputT, OutputT> doFn; - protected final Coder<WindowedValue<InputT>> inputCoder; - protected DoFnInvoker<InputT, OutputT> doFnInvoker; - protected OutputManager outputManager; - protected WindowingStrategy<?, ?> windowingStrategy; - protected final TupleTag<InputT> mainInputTag; - protected Collection<PCollectionView<?>> sideInputs; - protected SideInputHandler sideInputHandler; - protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView; - - // Initialize during runtime - protected ExecutorContext executorContext; - protected ExecutorsBolt executorsBolt; - protected TimerInternals timerInternals; - protected transient StateInternals pushbackStateInternals; - protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; - protected transient StateTag<WatermarkHoldState> watermarkHoldTag; - protected transient IKvStoreManager kvStoreManager; - protected DefaultStepContext stepContext; - protected transient MetricClient metricClient; - - public DoFnExecutor( - String stepName, - String description, - JStormPipelineOptions pipelineOptions, - DoFn<InputT, OutputT> doFn, - Coder<WindowedValue<InputT>> inputCoder, - WindowingStrategy<?, ?> windowingStrategy, - TupleTag<InputT> mainInputTag, - Collection<PCollectionView<?>> sideInputs, - Map<TupleTag, PCollectionView<?>> sideInputTagToView, - TupleTag<OutputT> mainTupleTag, - List<TupleTag<?>> sideOutputTags) { - this.stepName = checkNotNull(stepName, "stepName"); - this.description = checkNotNull(description, "description"); - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - this.doFn = doFn; - this.inputCoder = inputCoder; - this.outputManager = new DoFnExecutorOutputManager(); - this.windowingStrategy = windowingStrategy; - this.mainInputTag = mainInputTag; - this.sideInputs = sideInputs; - this.mainTupleTag = mainTupleTag; - this.sideOutputTags = sideOutputTags; - this.sideInputTagToView = sideInputTagToView; - } - - protected DoFnRunner<InputT, OutputT> getDoFnRunner() { - return new DoFnRunnerWithMetrics<>( - stepName, - DoFnRunners.simpleRunner( - this.pipelineOptions, - this.doFn, - this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler, - this.outputManager, - this.mainTupleTag, - this.sideOutputTags, - this.stepContext, - this.windowingStrategy), - MetricsReporter.create(metricClient)); - } - - protected void initService(ExecutorContext context) { - // TODO: what should be set for key in here? - timerInternals = new JStormTimerInternals( - null /* key */, this, context.getExecutorsBolt().timerService()); - kvStoreManager = context.getKvStoreManager(); - stepContext = new DefaultStepContext(timerInternals, - new JStormStateInternals( - null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - metricClient = new MetricClient(executorContext.getTopologyContext()); - } - - @Override - public void init(ExecutorContext context) { - this.executorContext = context; - this.executorsBolt = context.getExecutorsBolt(); - this.pipelineOptions = - this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); - - initService(context); - - // Side inputs setup - if (sideInputs != null && !sideInputs.isEmpty()) { - pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); - watermarkHoldTag = - StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST); - pushbackStateInternals = new JStormStateInternals( - null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); - sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals); - runner = getDoFnRunner(); - pushbackRunner = - SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler); - } else { - runner = getDoFnRunner(); - } - - // Process user's setup - doFnInvoker = DoFnInvokers.invokerFor(doFn); - doFnInvoker.invokeSetup(); - } - - @Override - public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { - LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}", - tag, mainInputTag, sideInputs, elem.getValue())); - if (mainInputTag.equals(tag)) { - processMainInput(elem); - } else { - processSideInput(tag, elem); - } - } - - protected <T> void processMainInput(WindowedValue<T> elem) { - if (sideInputs.isEmpty()) { - runner.processElement((WindowedValue<InputT>) elem); - } else { - Iterable<WindowedValue<InputT>> justPushedBack = - pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem); - BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - - Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (WindowedValue<InputT> pushedBackValue : justPushedBack) { - if (pushedBackValue.getTimestamp().isBefore(min)) { - min = pushedBackValue.getTimestamp(); - } - min = earlier(min, pushedBackValue.getTimestamp()); - pushedBack.add(pushedBackValue); - } - pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min); - } - } - - protected void processSideInput(TupleTag tag, WindowedValue elem) { - LOG.debug(String.format("side inputs: %s, %s.", tag, elem)); - - PCollectionView<?> sideInputView = sideInputTagToView.get(tag); - sideInputHandler.addSideInputValue(sideInputView, elem); - - BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - - List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); - - Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read(); - if (pushedBackInputs != null) { - for (WindowedValue<InputT> input : pushedBackInputs) { - - Iterable<WindowedValue<InputT>> justPushedBack = - pushbackRunner.processElementInReadyWindows(input); - Iterables.addAll(newPushedBack, justPushedBack); - } - } - pushedBack.clear(); - - Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (WindowedValue<InputT> pushedBackValue : newPushedBack) { - min = earlier(min, pushedBackValue.getTimestamp()); - pushedBack.add(pushedBackValue); - } - - WatermarkHoldState watermarkHold = - pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); - // TODO: clear-then-add is not thread-safe. - watermarkHold.clear(); - watermarkHold.add(min); - } - - /** - * Process all pushed back elements when receiving watermark with max timestamp. - */ - public void processAllPushBackElements() { - if (sideInputs != null && !sideInputs.isEmpty()) { - BagState<WindowedValue<InputT>> pushedBackElements = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - if (pushedBackElements != null) { - for (WindowedValue<InputT> elem : pushedBackElements.read()) { - LOG.info("Process pushback elem={}", elem); - runner.processElement(elem); - } - pushedBackElements.clear(); - } - - WatermarkHoldState watermarkHold = - pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag); - watermarkHold.clear(); - watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE); - } - } - - public void onTimer(Object key, TimerInternals.TimerData timerData) { - StateNamespace namespace = timerData.getNamespace(); - checkArgument(namespace instanceof StateNamespaces.WindowNamespace); - BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); - if (pushbackRunner != null) { - pushbackRunner.onTimer( - timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); - } else { - runner.onTimer( - timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); - } - } - - @Override - public void cleanup() { - doFnInvoker.invokeTeardown(); - } - - @Override - public String toString() { - return description; - } - - private Instant earlier(Instant left, Instant right) { - return left.isBefore(right) ? left : right; - } - - public void startBundle() { - if (pushbackRunner != null) { - pushbackRunner.startBundle(); - } else { - runner.startBundle(); - } - } - - public void finishBundle() { - if (pushbackRunner != null) { - pushbackRunner.finishBundle(); - } else { - runner.finishBundle(); - } - } - - public void setInternalDoFnExecutorId(int id) { - this.internalDoFnExecutorId = id; - } - - public int getInternalDoFnExecutorId() { - return internalDoFnExecutorId; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java deleted file mode 100644 index 1610a8a..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Closeable; -import java.io.IOException; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsEnvironment; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.joda.time.Instant; - -/** - * DoFnRunner decorator which registers {@link MetricsContainer}. - */ -public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - - private final String stepName; - private final DoFnRunner<InputT, OutputT> delegate; - private final MetricsReporter metricsReporter; - - DoFnRunnerWithMetrics( - String stepName, - DoFnRunner<InputT, OutputT> delegate, - MetricsReporter metricsReporter) { - this.stepName = checkNotNull(stepName, "stepName"); - this.delegate = checkNotNull(delegate, "delegate"); - this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter"); - } - - @Override - public void startBundle() { - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( - metricsReporter.getMetricsContainer(stepName))) { - delegate.startBundle(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void processElement(WindowedValue<InputT> elem) { - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( - metricsReporter.getMetricsContainer(stepName))) { - delegate.processElement(elem); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onTimer( - String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( - metricsReporter.getMetricsContainer(stepName))) { - delegate.onTimer(timerId, window, timestamp, timeDomain); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void finishBundle() { - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( - metricsReporter.getMetricsContainer(stepName))) { - delegate.finishBundle(); - } catch (IOException e) { - throw new RuntimeException(e); - } - metricsReporter.updateMetrics(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java deleted file mode 100644 index 0ec4fdd..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import java.io.Serializable; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * An executor is a basic executable unit in a JStorm task. - */ -public interface Executor extends Serializable { - /** - * Initialization during runtime. - */ - void init(ExecutorContext context); - - <T> void process(TupleTag<T> tag, WindowedValue<T> elem); - - void cleanup(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java deleted file mode 100644 index 55ca171..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import backtype.storm.task.TopologyContext; -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.google.auto.value.AutoValue; - -/** - * Context of a executors bolt when runtime. - */ -@AutoValue -public abstract class ExecutorContext { - public static ExecutorContext of( - TopologyContext topologyContext, - ExecutorsBolt bolt, - IKvStoreManager kvStoreManager) { - return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager); - } - - public abstract TopologyContext getTopologyContext(); - - public abstract ExecutorsBolt getExecutorsBolt(); - - public abstract IKvStoreManager getKvStoreManager(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java deleted file mode 100644 index 0366c13..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import static com.google.common.base.Preconditions.checkNotNull; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBatchBolt; -import backtype.storm.tuple.ITupleExt; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.alibaba.jstorm.cache.KvStoreManagerFactory; -import com.alibaba.jstorm.cluster.Common; -import com.alibaba.jstorm.utils.KryoSerializer; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.beam.runners.jstorm.translation.util.CommonInstance; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG. - */ -public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { - private static final long serialVersionUID = -7751043327801735211L; - - private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); - - protected ExecutorContext executorContext; - - protected TimerService timerService; - - // map from input tag to executor inside bolt - protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap(); - // set of all output tags that will be emit outside bolt - protected final Set<TupleTag> outputTags = Sets.newHashSet(); - protected final Set<TupleTag> externalOutputTags = Sets.newHashSet(); - protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet(); - protected int internalDoFnExecutorId = 1; - protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap(); - - protected OutputCollector collector; - - protected boolean isStatefulBolt = false; - - protected KryoSerializer<WindowedValue> serializer; - - public ExecutorsBolt() { - - } - - public void setStatefulBolt(boolean isStateful) { - isStatefulBolt = isStateful; - } - - public void addExecutor(TupleTag inputTag, Executor executor) { - inputTagToExecutor.put( - checkNotNull(inputTag, "inputTag"), - checkNotNull(executor, "executor")); - } - - public Map<TupleTag, Executor> getExecutors() { - return inputTagToExecutor; - } - - public void registerExecutor(Executor executor) { - if (executor instanceof DoFnExecutor) { - DoFnExecutor doFnExecutor = (DoFnExecutor) executor; - idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor); - doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId); - internalDoFnExecutorId++; - } - } - - public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() { - return idToDoFnExecutor; - } - - public void addOutputTags(TupleTag tag) { - outputTags.add(tag); - } - - public void addExternalOutputTag(TupleTag<?> tag) { - externalOutputTags.add(tag); - } - - public Set<TupleTag> getOutputTags() { - return outputTags; - } - - public ExecutorContext getExecutorContext() { - return executorContext; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - LOG.info("Start to prepare for task-{}", context.getThisTaskId()); - try { - this.collector = collector; - - // init kv store manager - String storeName = String.format("task-%d", context.getThisTaskId()); - String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); - IKvStoreManager kvStoreManager = isStatefulBolt - ? KvStoreManagerFactory.getKvStoreManagerWithMonitor( - context, storeName, stateStorePath, isStatefulBolt) - : KvStoreManagerFactory.getKvStoreManager( - stormConf, storeName, stateStorePath, isStatefulBolt); - this.executorContext = ExecutorContext.of(context, this, kvStoreManager); - - // init time service - timerService = initTimerService(); - - // init all internal executors - for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { - executor.init(executorContext); - if (executor instanceof DoFnExecutor) { - doFnExecutors.add((DoFnExecutor) executor); - } - } - - this.serializer = new KryoSerializer<WindowedValue>(stormConf); - - LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values()); - LOG.info("inputTagToExecutor={}", inputTagToExecutor); - LOG.info("outputTags={}", outputTags); - LOG.info("externalOutputTags={}", externalOutputTags); - LOG.info("doFnExecutors={}", doFnExecutors); - } catch (IOException e) { - throw new RuntimeException("Failed to prepare executors bolt", e); - } - } - - public TimerService initTimerService() { - TopologyContext context = executorContext.getTopologyContext(); - List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet()) - .transformAndConcat( - new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() { - @Override - public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) { - if (Common.isSystemComponent(value.getKey())) { - return Collections.EMPTY_LIST; - } else { - return value.getValue(); - } - } - }) - .toList(); - TimerService ret = new TimerServiceImpl(executorContext); - ret.init(tasks); - return ret; - } - - @Override - public void execute(Tuple input) { - // process a batch - String streamId = input.getSourceStreamId(); - ITupleExt tuple = (ITupleExt) input; - Iterator<List<Object>> valueIterator = tuple.batchValues().iterator(); - if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { - while (valueIterator.hasNext()) { - processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); - } - } else { - doFnStartBundle(); - while (valueIterator.hasNext()) { - processElement(valueIterator.next(), streamId); - } - doFnFinishBundle(); - } - } - - private void processWatermark(long watermarkTs, int sourceTask) { - long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs); - LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}", - (new Instant(watermarkTs)).toDateTime(), - sourceTask, - (new Instant(newWaterMark)).toDateTime()); - if (newWaterMark != 0) { - // Some buffer windows are going to be triggered. - doFnStartBundle(); - timerService.fireTimers(newWaterMark); - - // SideInput: If receiving water mark with max timestamp, It means no more data is supposed - // to be received from now on. So we are going to process all push back data. - if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { - for (DoFnExecutor doFnExecutor : doFnExecutors) { - doFnExecutor.processAllPushBackElements(); - } - } - - doFnFinishBundle(); - } - - long currentWaterMark = timerService.currentOutputWatermark(); - if (!externalOutputTags.isEmpty()) { - collector.flush(); - collector.emit( - CommonInstance.BEAM_WATERMARK_STREAM_ID, - new Values(currentWaterMark)); - LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime()); - } - } - - private void processElement(List<Object> values, String streamId) { - TupleTag inputTag = new TupleTag(streamId); - WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values); - processExecutorElem(inputTag, windowedValue); - } - - public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) { - LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue()); - if (elem != null) { - Executor executor = inputTagToExecutor.get(inputTag); - if (executor != null) { - executor.process(inputTag, elem); - } - if (externalOutputTags.contains(inputTag)) { - emitOutsideBolt(inputTag, elem); - } - } else { - LOG.info("Received null elem for tag={}", inputTag); - } - } - - @Override - public void cleanup() { - for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { - executor.cleanup(); - } - executorContext.getKvStoreManager().close(); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - - public TimerService timerService() { - return timerService; - } - - public void setTimerService(TimerService service) { - timerService = service; - } - - private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) { - WindowedValue wv = null; - if (values.size() > 1) { - Object key = values.get(0); - WindowedValue value = serializer.deserialize((byte[]) values.get(1)); - wv = value.withValue(KV.of(key, value.getValue())); - } else { - wv = serializer.deserialize((byte[]) values.get(0)); - } - return wv; - } - - protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) { - LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue()); - if (keyedEmit(outputTag.getId())) { - KV kv = (KV) outputValue.getValue(); - byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue())); - // Convert WindowedValue<KV> to <K, WindowedValue<V>> - if (kv.getKey() == null) { - // If key is null, emit "null" string here. Because, null value will be ignored in JStorm. - collector.emit(outputTag.getId(), new Values("null", immutableOutputValue)); - } else { - collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue)); - } - } else { - byte[] immutableOutputValue = serializer.serialize(outputValue); - collector.emit(outputTag.getId(), new Values(immutableOutputValue)); - } - } - - private void doFnStartBundle() { - for (DoFnExecutor doFnExecutor : doFnExecutors) { - doFnExecutor.startBundle(); - } - } - - private void doFnFinishBundle() { - for (DoFnExecutor doFnExecutor : doFnExecutors) { - doFnExecutor.finishBundle(); - } - } - - @Override - public String toString() { - // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString()); - List<String> ret = new ArrayList<>(); - /*ret.add("inputTags"); - for (TupleTag inputTag : inputTagToExecutor.keySet()) { - ret.add(inputTag.getId()); - }*/ - ret.add("internalExecutors"); - for (Executor executor : inputTagToExecutor.values()) { - ret.add(executor.toString()); - } - ret.add("externalOutputTags"); - for (TupleTag output : externalOutputTags) { - ret.add(output.getId()); - } - return Joiner.on('\n').join(ret).concat("\n"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java deleted file mode 100644 index caf1e47..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}. - * @param <InputT> - */ -public class FlattenExecutor<InputT> implements Executor { - - private final String description; - private TupleTag mainOutputTag; - private ExecutorContext context; - private ExecutorsBolt executorsBolt; - - public FlattenExecutor(String description, TupleTag mainTupleTag) { - this.description = checkNotNull(description, "description"); - this.mainOutputTag = mainTupleTag; - } - - @Override - public void init(ExecutorContext context) { - this.context = context; - this.executorsBolt = context.getExecutorsBolt(); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - executorsBolt.processExecutorElem(mainOutputTag, elem); - } - - @Override - public void cleanup() { - } - - @Override - public String toString() { - return description; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java deleted file mode 100644 index 0dd1af9..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.ImmutableList; -import java.io.Serializable; -import java.util.List; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.KeyedWorkItems; -import org.apache.beam.runners.core.NullSideInputReader; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateInternalsFactory; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.SystemReduceFn; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.TimerInternalsFactory; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext; -import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; -import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; -import org.apache.beam.runners.jstorm.util.RunnerUtils; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}. - * @param <K> - * @param <V> - */ -public class GroupByWindowExecutor<K, V> - extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> { - private static final long serialVersionUID = -7563050475488610553L; - - private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class); - - private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable { - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - executorsBolt.processExecutorElem(tag, output); - } - } - - private KvCoder<K, V> inputKvCoder; - private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn; - - public GroupByWindowExecutor( - String stepName, - String description, - TranslationContext context, - JStormPipelineOptions pipelineOptions, - WindowingStrategy<?, ?> windowingStrategy, - TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) { - // The doFn will be created when runtime. Just pass "null" here - super( - stepName, - description, - pipelineOptions, - null, - null, - windowingStrategy, - null, - null, - null, - mainTupleTag, - sideOutputTags); - - this.outputManager = new GroupByWindowOutputManager(); - UserGraphContext userGraphContext = context.getUserGraphContext(); - PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput(); - this.inputKvCoder = (KvCoder<K, V>) input.getCoder(); - } - - private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() { - final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() { - @Override - public StateInternals stateInternalsForKey(K key) { - return new JStormStateInternals<K>( - key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); - } - }; - TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() { - @Override - public TimerInternals timerInternalsForKey(K key) { - return new JStormTimerInternals<>( - key, - GroupByWindowExecutor.this, - executorContext.getExecutorsBolt().timerService()); - } - }; - - reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder()); - DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn = - GroupAlsoByWindowViaWindowSetNewDoFn.create( - windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(), - (SystemReduceFn) reduceFn, outputManager, mainTupleTag); - return doFn; - } - - @Override - protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() { - doFn = getGroupByWindowDoFn(); - - DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.simpleRunner( - this.pipelineOptions, - this.doFn, - NullSideInputReader.empty(), - this.outputManager, - this.mainTupleTag, - this.sideOutputTags, - this.stepContext, - this.windowingStrategy); - - DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = - DoFnRunners.lateDataDroppingRunner( - simpleRunner, - this.stepContext, - this.windowingStrategy); - return new DoFnRunnerWithMetrics<>( - stepName, doFnRunner, MetricsReporter.create(metricClient)); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - /** - * For GroupByKey, KV type elem is received. We need to convert the KV elem - * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner. - */ - KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); - runner.processElement(elem.withValue(keyedWorkItem)); - } - - @Override - public void onTimer(Object key, TimerInternals.TimerData timerData) { - StateNamespace namespace = timerData.getNamespace(); - checkArgument(namespace instanceof StateNamespaces.WindowNamespace); - - runner.processElement( - WindowedValue.valueInGlobalWindow( - KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData)))); - } - - @Override - public String toString() { - return super.toString(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java deleted file mode 100644 index a022440..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; - -import com.alibaba.jstorm.common.metric.AsmCounter; -import com.alibaba.jstorm.metric.MetricClient; -import com.google.common.collect.Maps; -import java.util.Map; -import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsFilter; - -/** - * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine. - */ -public class MetricsReporter { - - private static final String METRIC_KEY_SEPARATOR = "__"; - private static final String COUNTER_PREFIX = "__counter"; - - private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); - private final Map<String, Long> reportedCounters = Maps.newHashMap(); - private final MetricClient metricClient; - - public static MetricsReporter create(MetricClient metricClient) { - return new MetricsReporter(metricClient); - } - - private MetricsReporter(MetricClient metricClient) { - this.metricClient = checkNotNull(metricClient, "metricClient"); - } - - public MetricsContainer getMetricsContainer(String stepName) { - return metricsContainers.getContainer(stepName); - } - - public void updateMetrics() { - MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers); - MetricQueryResults metricQueryResults = - metricResults.queryMetrics(MetricsFilter.builder().build()); - updateCounters(metricQueryResults.counters()); - } - - private void updateCounters(Iterable<MetricResult<Long>> counters) { - System.out.print("updateCounters"); - for (MetricResult<Long> metricResult : counters) { - String metricName = getMetricNameString(COUNTER_PREFIX, metricResult); - System.out.print("metricName: " + metricName); - Long updateValue = metricResult.attempted(); - Long oldValue = reportedCounters.get(metricName); - - if (oldValue == null || oldValue < updateValue) { - AsmCounter counter = metricClient.registerCounter(metricName); - Long incValue = (oldValue == null ? updateValue : updateValue - oldValue); - counter.update(incValue); - } - } - } - - private String getMetricNameString(String prefix, MetricResult<?> metricResult) { - return prefix - + METRIC_KEY_SEPARATOR + metricResult.step() - + METRIC_KEY_SEPARATOR + metricResult.name().namespace() - + METRIC_KEY_SEPARATOR + metricResult.name().name(); - } -}
