http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java index 2d80617..7f98c61 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java @@ -17,177 +17,175 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; +import static com.google.common.base.Preconditions.checkNotNull; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Values; +import com.alibaba.jstorm.utils.KryoSerializer; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.runners.jstorm.translation.util.CommonInstance; -import com.alibaba.jstorm.utils.KryoSerializer; +import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; - -import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Values; - import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.google.common.base.Preconditions.checkNotNull; - /** * Spout implementation that wraps a Beam UnboundedSource - * + * <p> * TODO: add wrapper to support metrics in UnboundedSource. */ public class UnboundedSourceSpout extends AdaptorBasicSpout { - private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); - - private final String description; - private final UnboundedSource source; - private final SerializedPipelineOptions serializedOptions; - private final TupleTag<?> outputTag; - - private transient JStormPipelineOptions pipelineOptions; - private transient UnboundedSource.UnboundedReader reader; - private transient SpoutOutputCollector collector; - - private volatile boolean hasNextRecord; - private AtomicBoolean activated = new AtomicBoolean(); - - private KryoSerializer<WindowedValue> serializer; - - private long lastWaterMark = 0l; - - public UnboundedSourceSpout( - String description, - UnboundedSource source, - JStormPipelineOptions options, - TupleTag<?> outputTag) { - this.description = checkNotNull(description, "description"); - this.source = checkNotNull(source, "source"); - this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); - this.outputTag = checkNotNull(outputTag, "outputTag"); - } - - @Override - public synchronized void close() { - try { - activated.set(false); - this.reader.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void activate() { - activated.set(true); - - } - - @Override - public void deactivate() { - activated.set(false); - } - - @Override - public void ack(Object msgId) { - throw new UnsupportedOperationException(); + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); + + private final String description; + private final UnboundedSource source; + private final SerializedPipelineOptions serializedOptions; + private final TupleTag<?> outputTag; + + private transient JStormPipelineOptions pipelineOptions; + private transient UnboundedSource.UnboundedReader reader; + private transient SpoutOutputCollector collector; + + private volatile boolean hasNextRecord; + private AtomicBoolean activated = new AtomicBoolean(); + + private KryoSerializer<WindowedValue> serializer; + + private long lastWaterMark = 0l; + + public UnboundedSourceSpout( + String description, + UnboundedSource source, + JStormPipelineOptions options, + TupleTag<?> outputTag) { + this.description = checkNotNull(description, "description"); + this.source = checkNotNull(source, "source"); + this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); + this.outputTag = checkNotNull(outputTag, "outputTag"); + } + + @Override + public synchronized void close() { + try { + activated.set(false); + this.reader.close(); + } catch (IOException e) { + e.printStackTrace(); } - - @Override - public void fail(Object msgId) { - throw new UnsupportedOperationException(); + } + + @Override + public void activate() { + activated.set(true); + + } + + @Override + public void deactivate() { + activated.set(false); + } + + @Override + public void ack(Object msgId) { + throw new UnsupportedOperationException(); + } + + @Override + public void fail(Object msgId) { + throw new UnsupportedOperationException(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + try { + this.collector = collector; + this.pipelineOptions = + this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); + + createSourceReader(null); + + this.serializer = new KryoSerializer<>(conf); + } catch (IOException e) { + throw new RuntimeException("Unable to create unbounded reader.", e); } + } - @Override - public Map<String, Object> getComponentConfiguration() { - return null; + public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException { + if (reader != null) { + reader.close(); } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - try { - this.collector = collector; - this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); - - createSourceReader(null); - - this.serializer = new KryoSerializer<>(conf); - } catch (IOException e) { - throw new RuntimeException("Unable to create unbounded reader.", e); - } + reader = this.source.createReader(this.pipelineOptions, checkpointMark); + hasNextRecord = this.reader.start(); + } + + @Override + public synchronized void nextTuple() { + if (!activated.get()) { + return; } - - public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException { - if (reader != null) { - reader.close(); + try { + if (!hasNextRecord) { + hasNextRecord = reader.advance(); + } + + while (hasNextRecord && activated.get()) { + Object value = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + + WindowedValue wv = + WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + LOG.debug("Source output: " + wv.getValue()); + if (keyedEmit(outputTag.getId())) { + KV kv = (KV) wv.getValue(); + // Convert WindowedValue<KV> to <K, WindowedValue<V>> + byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue())); + collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue)); + } else { + byte[] immutableValue = serializer.serialize(wv); + collector.emit(outputTag.getId(), new Values(immutableValue)); } - reader = this.source.createReader(this.pipelineOptions, checkpointMark); - hasNextRecord = this.reader.start(); - } - @Override - public synchronized void nextTuple() { - if (!activated.get()) { - return; - } - try { - if (!hasNextRecord) { - hasNextRecord = reader.advance(); - } - - while (hasNextRecord && activated.get()) { - Object value = reader.getCurrent(); - Instant timestamp = reader.getCurrentTimestamp(); - - WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); - LOG.debug("Source output: " + wv.getValue()); - if (keyedEmit(outputTag.getId())) { - KV kv = (KV) wv.getValue(); - // Convert WindowedValue<KV> to <K, WindowedValue<V>> - byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue())); - collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue)); - } else { - byte[] immutableValue = serializer.serialize(wv); - collector.emit(outputTag.getId(), new Values(immutableValue)); - } - - // move to next record - hasNextRecord = reader.advance(); - } - - Instant waterMark = reader.getWatermark(); - if (waterMark != null && lastWaterMark < waterMark.getMillis()) { - lastWaterMark = waterMark.getMillis(); - collector.flush(); - collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis())); - LOG.debug("Source output: WM-{}", waterMark.toDateTime()); - } - } catch (IOException e) { - throw new RuntimeException("Exception reading values from source.", e); - } + // move to next record + hasNextRecord = reader.advance(); + } + + Instant waterMark = reader.getWatermark(); + if (waterMark != null && lastWaterMark < waterMark.getMillis()) { + lastWaterMark = waterMark.getMillis(); + collector.flush(); + collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis())); + LOG.debug("Source output: WM-{}", waterMark.toDateTime()); + } + } catch (IOException e) { + throw new RuntimeException("Exception reading values from source.", e); } + } - public UnboundedSource getUnboundedSource() { - return source; - } + public UnboundedSource getUnboundedSource() { + return source; + } - public UnboundedSource.UnboundedReader getUnboundedSourceReader() { - return reader; - } + public UnboundedSource.UnboundedReader getUnboundedSourceReader() { + return reader; + } - @Override - public String toString() { - return description; - } + @Override + public String toString() { + return description; + } }
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java index 7b0e8db..4320967 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java @@ -26,30 +26,31 @@ import org.apache.beam.sdk.values.TupleTag; */ public class ViewExecutor implements Executor { - private final String description; - private final TupleTag outputTag; - private ExecutorsBolt executorsBolt; - - public ViewExecutor(String description, TupleTag outputTag) { - this.description = description; - this.outputTag = outputTag; - } - - @Override - public void init(ExecutorContext context) { - this.executorsBolt = context.getExecutorsBolt(); - } - - @Override - public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { - executorsBolt.processExecutorElem(outputTag, elem); - } - - @Override - public void cleanup() {} - - @Override - public String toString() { - return description; - } + private final String description; + private final TupleTag outputTag; + private ExecutorsBolt executorsBolt; + + public ViewExecutor(String description, TupleTag outputTag) { + this.description = description; + this.outputTag = outputTag; + } + + @Override + public void init(ExecutorContext context) { + this.executorsBolt = context.getExecutorsBolt(); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + executorsBolt.processExecutorElem(outputTag, elem); + } + + @Override + public void cleanup() { + } + + @Override + public String toString() { + return description; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java index a6c3c16..7f21d26 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java @@ -17,7 +17,10 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.collect.Iterables; +import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; @@ -26,82 +29,79 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; - -import static com.google.common.base.Preconditions.checkArgument; - public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor { - private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class); - - private final String description; - private WindowFn<T, W> windowFn; - private ExecutorsBolt executorsBolt; - private TupleTag outputTag; - - class JStormAssignContext<InputT, W extends BoundedWindow> - extends WindowFn<InputT, W>.AssignContext { - private final WindowedValue<InputT> value; - - JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { - fn.super(); - checkArgument( - Iterables.size(value.getWindows()) == 1, - String.format( - "%s passed to window assignment must be in a single window, but it was in %s: %s", - WindowedValue.class.getSimpleName(), - Iterables.size(value.getWindows()), - value.getWindows())); - this.value = value; - } - - @Override - public InputT element() { - return value.getValue(); - } - - @Override - public Instant timestamp() { - return value.getTimestamp(); - } - - @Override - public BoundedWindow window() { - return Iterables.getOnlyElement(value.getWindows()); - } + private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class); + + private final String description; + private WindowFn<T, W> windowFn; + private ExecutorsBolt executorsBolt; + private TupleTag outputTag; + + class JStormAssignContext<InputT, W extends BoundedWindow> + extends WindowFn<InputT, W>.AssignContext { + private final WindowedValue<InputT> value; + + JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { + fn.super(); + checkArgument( + Iterables.size(value.getWindows()) == 1, + String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); + this.value = value; } - public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) { - this.description = description; - this.windowFn = windowFn; - this.outputTag = outputTag; + @Override + public InputT element() { + return value.getValue(); } @Override - public void init(ExecutorContext context) { - this.executorsBolt = context.getExecutorsBolt(); + public Instant timestamp() { + return value.getTimestamp(); } @Override - public void process(TupleTag tag, WindowedValue elem) { - Collection<W> windows = null; - try { - windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem)); - for (W window: windows) { - executorsBolt.processExecutorElem( - outputTag, - WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane())); - } - } catch (Exception e) { - LOG.warn("Failed to assign windows for elem=" + elem, e); - } + public BoundedWindow window() { + return Iterables.getOnlyElement(value.getWindows()); } + } + + public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) { + this.description = description; + this.windowFn = windowFn; + this.outputTag = outputTag; + } + + @Override + public void init(ExecutorContext context) { + this.executorsBolt = context.getExecutorsBolt(); + } + + @Override + public void process(TupleTag tag, WindowedValue elem) { + Collection<W> windows = null; + try { + windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem)); + for (W window : windows) { + executorsBolt.processExecutorElem( + outputTag, + WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane())); + } + } catch (Exception e) { + LOG.warn("Failed to assign windows for elem=" + elem, e); + } + } - @Override - public void cleanup() {} + @Override + public void cleanup() { + } - @Override - public String toString() { - return description; - } + @Override + public String toString() { + return description; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java index eaf0549..1466f35 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,162 +17,161 @@ */ package org.apache.beam.runners.jstorm.translation.runtime.state; +import static com.google.common.base.Preconditions.checkNotNull; + import com.alibaba.jstorm.cache.ComposedKey; import com.alibaba.jstorm.cache.IKvStore; import com.alibaba.jstorm.cache.KvStoreIterable; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.ReadableState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.Iterator; -import java.util.NoSuchElementException; - -import static com.google.common.base.Preconditions.checkNotNull; - /** * JStorm implementation of {@link BagState}. */ class JStormBagState<K, T> implements BagState<T> { - private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class); - - @Nullable - private final K key; - private final StateNamespace namespace; - private final IKvStore<ComposedKey, T> kvState; - private final IKvStore<ComposedKey, Object> stateInfoKvState; - private int elemIndex; - - public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState, - IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException { - this.key = key; - this.namespace = checkNotNull(namespace, "namespace"); - this.kvState = checkNotNull(kvState, "kvState"); - this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState"); - - Integer index = (Integer) stateInfoKvState.get(getComposedKey()); - this.elemIndex = index != null ? ++index : 0; + private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class); + + @Nullable + private final K key; + private final StateNamespace namespace; + private final IKvStore<ComposedKey, T> kvState; + private final IKvStore<ComposedKey, Object> stateInfoKvState; + private int elemIndex; + + public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState, + IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException { + this.key = key; + this.namespace = checkNotNull(namespace, "namespace"); + this.kvState = checkNotNull(kvState, "kvState"); + this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState"); + + Integer index = (Integer) stateInfoKvState.get(getComposedKey()); + this.elemIndex = index != null ? ++index : 0; + } + + @Override + public void add(T input) { + try { + kvState.put(getComposedKey(elemIndex), input); + stateInfoKvState.put(getComposedKey(), elemIndex); + elemIndex++; + } catch (IOException e) { + throw new RuntimeException(e.getCause()); } - - @Override - public void add(T input) { - try { - kvState.put(getComposedKey(elemIndex), input); - stateInfoKvState.put(getComposedKey(), elemIndex); - elemIndex++; - } catch (IOException e) { - throw new RuntimeException(e.getCause()); - } + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public Boolean read() { + return elemIndex <= 0; + } + + @Override + public ReadableState<Boolean> readLater() { + // TODO: support prefetch. + return this; + } + }; + } + + @Override + public Iterable<T> read() { + return new BagStateIterable(elemIndex); + } + + @Override + public BagState readLater() { + // TODO: support prefetch. + return this; + } + + @Override + public void clear() { + try { + for (int i = 0; i < elemIndex; i++) { + kvState.remove(getComposedKey(i)); + } + stateInfoKvState.remove(getComposedKey()); + elemIndex = 0; + } catch (IOException e) { + throw new RuntimeException(e.getCause()); } + } - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - return elemIndex <= 0; - } - - @Override - public ReadableState<Boolean> readLater() { - // TODO: support prefetch. - return this; - } - }; - } + private ComposedKey getComposedKey() { + return ComposedKey.of(key, namespace); + } - @Override - public Iterable<T> read() { - return new BagStateIterable(elemIndex); - } + private ComposedKey getComposedKey(int elemIndex) { + return ComposedKey.of(key, namespace, elemIndex); + } - @Override - public BagState readLater() { - // TODO: support prefetch. - return this; - } + private class BagStateIterable implements KvStoreIterable<T> { - @Override - public void clear() { + private class BagStateIterator implements Iterator<T> { + private final int size; + private int cursor = 0; + + BagStateIterator() { + Integer s = null; try { - for (int i = 0; i < elemIndex; i++) { - kvState.remove(getComposedKey(i)); - } - stateInfoKvState.remove(getComposedKey()); - elemIndex = 0; + s = (Integer) stateInfoKvState.get(getComposedKey()); } catch (IOException e) { - throw new RuntimeException(e.getCause()); + LOG.error("Failed to get elemIndex for key={}", getComposedKey()); + } + this.size = s != null ? ++s : 0; + } + + @Override + public boolean hasNext() { + return cursor < size; + } + + @Override + public T next() { + if (cursor >= size) { + throw new NoSuchElementException(); } - } - - private ComposedKey getComposedKey() { - return ComposedKey.of(key, namespace); - } - - private ComposedKey getComposedKey(int elemIndex) { - return ComposedKey.of(key, namespace, elemIndex); - } - private class BagStateIterable implements KvStoreIterable<T> { - - private class BagStateIterator implements Iterator<T> { - private final int size; - private int cursor = 0; - - BagStateIterator() { - Integer s = null; - try { - s = (Integer) stateInfoKvState.get(getComposedKey()); - } catch (IOException e) { - LOG.error("Failed to get elemIndex for key={}", getComposedKey()); - } - this.size = s != null ? ++s : 0; - } - - @Override - public boolean hasNext() { - return cursor < size; - } - - @Override - public T next() { - if (cursor >= size) { - throw new NoSuchElementException(); - } - - T value = null; - try { - value = kvState.get(getComposedKey(cursor)); - } catch (IOException e) { - LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor)); - } - cursor++; - return value; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } + T value = null; + try { + value = kvState.get(getComposedKey(cursor)); + } catch (IOException e) { + LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor)); } + cursor++; + return value; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } - private final int size; + private final int size; - BagStateIterable(int size) { - this.size = size; - } + BagStateIterable(int size) { + this.size = size; + } - @Override - public Iterator<T> iterator() { - return new BagStateIterator(); - } + @Override + public Iterator<T> iterator() { + return new BagStateIterator(); + } - @Override - public String toString() { - return String.format("BagStateIterable: composedKey=%s", getComposedKey()); - } + @Override + public String toString() { + return String.format("BagStateIterable: composedKey=%s", getComposedKey()); } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java index b0fe29b..7c6a239 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,7 +20,6 @@ package org.apache.beam.runners.jstorm.translation.runtime.state; import static com.google.common.base.Preconditions.checkNotNull; import javax.annotation.Nullable; - import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.ReadableState; @@ -30,59 +29,60 @@ import org.apache.beam.sdk.transforms.Combine; * JStorm implementation of {@link CombiningState}. */ public class JStormCombiningState<InputT, AccumT, OutputT> - implements CombiningState<InputT, AccumT, OutputT> { + implements CombiningState<InputT, AccumT, OutputT> { + + @Nullable + private final BagState<AccumT> accumBagState; + private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; - @Nullable - private final BagState<AccumT> accumBagState; - private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; - JStormCombiningState( - BagState<AccumT> accumBagState, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - this.accumBagState = checkNotNull(accumBagState, "accumBagState"); - this.combineFn = checkNotNull(combineFn, "combineFn"); - } + JStormCombiningState( + BagState<AccumT> accumBagState, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + this.accumBagState = checkNotNull(accumBagState, "accumBagState"); + this.combineFn = checkNotNull(combineFn, "combineFn"); + } - @Override - public AccumT getAccum() { - // TODO: replacing the accumBagState with the merged accum. - return combineFn.mergeAccumulators(accumBagState.read()); - } + @Override + public AccumT getAccum() { + // TODO: replacing the accumBagState with the merged accum. + return combineFn.mergeAccumulators(accumBagState.read()); + } - @Override - public void addAccum(AccumT accumT) { - accumBagState.add(accumT); - } + @Override + public void addAccum(AccumT accumT) { + accumBagState.add(accumT); + } - @Override - public AccumT mergeAccumulators(Iterable<AccumT> iterable) { - return combineFn.mergeAccumulators(iterable); - } + @Override + public AccumT mergeAccumulators(Iterable<AccumT> iterable) { + return combineFn.mergeAccumulators(iterable); + } - @Override - public void add(InputT input) { - accumBagState.add( - combineFn.addInput(combineFn.createAccumulator(), input)); - } + @Override + public void add(InputT input) { + accumBagState.add( + combineFn.addInput(combineFn.createAccumulator(), input)); + } - @Override - public ReadableState<Boolean> isEmpty() { - return accumBagState.isEmpty(); - } + @Override + public ReadableState<Boolean> isEmpty() { + return accumBagState.isEmpty(); + } - @Override - public OutputT read() { - return combineFn.extractOutput( - combineFn.mergeAccumulators(accumBagState.read())); - } + @Override + public OutputT read() { + return combineFn.extractOutput( + combineFn.mergeAccumulators(accumBagState.read())); + } - @Override - public CombiningState<InputT, AccumT, OutputT> readLater() { - // TODO: support prefetch. - return this; - } + @Override + public CombiningState<InputT, AccumT, OutputT> readLater() { + // TODO: support prefetch. + return this; + } - @Override - public void clear() { - accumBagState.clear(); - } + @Override + public void clear() { + accumBagState.clear(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java index f101beb..f1c1ed0 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,137 +18,136 @@ package org.apache.beam.runners.jstorm.translation.runtime.state; import com.alibaba.jstorm.cache.IKvStore; +import java.io.IOException; +import java.util.Map; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.sdk.state.MapState; import org.apache.beam.sdk.state.ReadableState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Map; - public class JStormMapState<K, V> implements MapState<K, V> { - private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class); - - private final K key; - private final StateNamespace namespace; - private IKvStore<K, V> kvStore; - - public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) { - this.key = key; - this.namespace = namespace; - this.kvStore = kvStore; + private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class); + + private final K key; + private final StateNamespace namespace; + private IKvStore<K, V> kvStore; + + public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) { + this.key = key; + this.namespace = namespace; + this.kvStore = kvStore; + } + + @Override + public void put(K var1, V var2) { + try { + kvStore.put(var1, var2); + } catch (IOException e) { + reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e); } - - @Override - public void put(K var1, V var2) { - try { - kvStore.put(var1, var2); - } catch (IOException e) { - reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e); - } + } + + @Override + public ReadableState<V> putIfAbsent(K var1, V var2) { + ReadableState<V> ret = null; + try { + V value = kvStore.get(var1); + if (value == null) { + kvStore.put(var1, var2); + ret = new MapReadableState<>(null); + } else { + ret = new MapReadableState<>(value); + } + } catch (IOException e) { + reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e); } - - @Override - public ReadableState<V> putIfAbsent(K var1, V var2) { - ReadableState<V> ret = null; - try { - V value = kvStore.get(var1); - if (value == null) { - kvStore.put(var1, var2); - ret = new MapReadableState<>(null); - } else { - ret = new MapReadableState<>(value); - } - } catch (IOException e) { - reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e); - } - return ret; + return ret; + } + + @Override + public void remove(K var1) { + try { + kvStore.remove(var1); + } catch (IOException e) { + reportError(String.format("Failed to remove key=%s", var1), e); } - - @Override - public void remove(K var1) { - try { - kvStore.remove(var1); - } catch (IOException e) { - reportError(String.format("Failed to remove key=%s", var1), e); - } + } + + @Override + public ReadableState<V> get(K var1) { + ReadableState<V> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState(kvStore.get(var1)); + } catch (IOException e) { + reportError(String.format("Failed to get value for key=%s", var1), e); } - - @Override - public ReadableState<V> get(K var1) { - ReadableState<V> ret = new MapReadableState<>(null); - try { - ret = new MapReadableState(kvStore.get(var1)); - } catch (IOException e) { - reportError(String.format("Failed to get value for key=%s", var1), e); - } - return ret; + return ret; + } + + @Override + public ReadableState<Iterable<K>> keys() { + ReadableState<Iterable<K>> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState<>(kvStore.keys()); + } catch (IOException e) { + reportError(String.format("Failed to get keys"), e); } - - @Override - public ReadableState<Iterable<K>> keys() { - ReadableState<Iterable<K>> ret = new MapReadableState<>(null); - try { - ret = new MapReadableState<>(kvStore.keys()); - } catch (IOException e) { - reportError(String.format("Failed to get keys"), e); - } - return ret; + return ret; + } + + @Override + public ReadableState<Iterable<V>> values() { + ReadableState<Iterable<V>> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState<>(kvStore.values()); + } catch (IOException e) { + reportError(String.format("Failed to get values"), e); } - - @Override - public ReadableState<Iterable<V>> values() { - ReadableState<Iterable<V>> ret = new MapReadableState<>(null); - try { - ret = new MapReadableState<>(kvStore.values()); - } catch (IOException e) { - reportError(String.format("Failed to get values"), e); - } - return ret; - } - - @Override - public ReadableState<Iterable<Map.Entry<K, V>>> entries() { - ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null); - try { - ret = new MapReadableState<>(kvStore.entries()); - } catch (IOException e) { - reportError(String.format("Failed to get values"), e); - } - return ret; + return ret; + } + + @Override + public ReadableState<Iterable<Map.Entry<K, V>>> entries() { + ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState<>(kvStore.entries()); + } catch (IOException e) { + reportError(String.format("Failed to get values"), e); } - - @Override - public void clear() { - try { - Iterable<K> keys = kvStore.keys(); - kvStore.removeBatch(keys); - } catch (IOException e) { - reportError(String.format("Failed to clear map state"), e); - } + return ret; + } + + @Override + public void clear() { + try { + Iterable<K> keys = kvStore.keys(); + kvStore.removeBatch(keys); + } catch (IOException e) { + reportError(String.format("Failed to clear map state"), e); } + } - private void reportError(String errorInfo, IOException e) { - LOG.error(errorInfo, e); - throw new RuntimeException(errorInfo); - } + private void reportError(String errorInfo, IOException e) { + LOG.error(errorInfo, e); + throw new RuntimeException(errorInfo); + } - private class MapReadableState<T> implements ReadableState<T> { - private T value; + private class MapReadableState<T> implements ReadableState<T> { + private T value; - public MapReadableState(T value) { - this.value = value; - } + public MapReadableState(T value) { + this.value = value; + } - @Override - public T read() { - return value; - } + @Override + public T read() { + return value; + } - @Override - public ReadableState<T> readLater() { - return this; - } + @Override + public ReadableState<T> readLater() { + return this; } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java index 8a0cb73..80ef3a2 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,13 +17,16 @@ */ package org.apache.beam.runners.jstorm.translation.runtime.state; -import org.apache.beam.runners.jstorm.translation.runtime.TimerService; +import static com.google.common.base.Preconditions.checkNotNull; + import com.alibaba.jstorm.cache.ComposedKey; import com.alibaba.jstorm.cache.IKvStoreManager; - +import java.io.IOException; +import javax.annotation.Nullable; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.jstorm.translation.runtime.TimerService; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; @@ -41,151 +44,148 @@ import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.joda.time.Instant; -import javax.annotation.Nullable; -import java.io.IOException; - -import static com.google.common.base.Preconditions.checkNotNull; - /** * JStorm implementation of {@link StateInternals}. */ public class JStormStateInternals<K> implements StateInternals { - private static final String STATE_INFO = "state-info:"; - - @Nullable - private final K key; - private final IKvStoreManager kvStoreManager; - private final TimerService timerService; - private final int executorId; - - public JStormStateInternals(K key, IKvStoreManager kvStoreManager, - TimerService timerService, int executorId) { - this.key = key; - this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager"); - this.timerService = checkNotNull(timerService, "timerService"); - this.executorId = executorId; - } - - @Nullable - @Override - public K getKey() { - return key; - } - - @Override - public <T extends State> T state( - StateNamespace namespace, StateTag<T> address, StateContext<?> c) { - // throw new UnsupportedOperationException("StateContext is not supported."); - /** - * TODOï¼ - * Same implementation as state() which is without StateContext. This might be updated after - * we figure out if we really need StateContext for JStorm state internals. - */ - return state(namespace, address); - } - - @Override - public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) { - return address.getSpec().bind(address.getId(), new StateBinder() { - @Override - public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) { - try { - return new JStormValueState<>( - getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id))); - } catch (IOException e) { - throw new RuntimeException(); - } - } - - @Override - public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) { - try { - return new JStormBagState( - getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)), - kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); - } catch (IOException e) { - throw new RuntimeException(); + private static final String STATE_INFO = "state-info:"; + + @Nullable + private final K key; + private final IKvStoreManager kvStoreManager; + private final TimerService timerService; + private final int executorId; + + public JStormStateInternals(K key, IKvStoreManager kvStoreManager, + TimerService timerService, int executorId) { + this.key = key; + this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager"); + this.timerService = checkNotNull(timerService, "timerService"); + this.executorId = executorId; + } + + @Nullable + @Override + public K getKey() { + return key; + } + + @Override + public <T extends State> T state( + StateNamespace namespace, StateTag<T> address, StateContext<?> c) { + // throw new UnsupportedOperationException("StateContext is not supported."); + /** + * TODOï¼ + * Same implementation as state() which is without StateContext. This might be updated after + * we figure out if we really need StateContext for JStorm state internals. + */ + return state(namespace, address); + } + + @Override + public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) { + return address.getSpec().bind(address.getId(), new StateBinder() { + @Override + public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) { + try { + return new JStormValueState<>( + getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id))); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + @Override + public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) { + try { + return new JStormBagState( + getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)), + kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + @Override + public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) { + throw new UnsupportedOperationException(); + } + + @Override + public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( + String id, + StateSpec<MapState<KeyT, ValueT>> spec, + Coder<KeyT> mapKeyCoder, + Coder<ValueT> mapValueCoder) { + try { + return new JStormMapState<>( + getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public <InputT, AccumT, OutputT> CombiningState bindCombining( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + try { + BagState<AccumT> accumBagState = new JStormBagState( + getKey(), namespace, + kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)), + kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); + return new JStormCombiningState<>(accumBagState, combineFn); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + + @Override + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> + bindCombiningWithContext( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder, + CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) { + throw new UnsupportedOperationException(); + } + + @Override + public WatermarkHoldState bindWatermark( + String id, + StateSpec<WatermarkHoldState> spec, + final TimestampCombiner timestampCombiner) { + try { + BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState( + getKey(), namespace, + kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)), + kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); + + Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn = + new BinaryCombineFn<Instant>() { + @Override + public Instant apply(Instant left, Instant right) { + return timestampCombiner.combine(left, right); } - } - - @Override - public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) { - throw new UnsupportedOperationException(); - } - - @Override - public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - String id, - StateSpec<MapState<KeyT, ValueT>> spec, - Coder<KeyT> mapKeyCoder, - Coder<ValueT> mapValueCoder) { - try { - return new JStormMapState<>(getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id))); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public <InputT, AccumT, OutputT> CombiningState bindCombining( - String id, - StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - try { - BagState<AccumT> accumBagState = new JStormBagState( - getKey(), namespace, - kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)), - kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); - return new JStormCombiningState<>(accumBagState, combineFn); - } catch (IOException e) { - throw new RuntimeException(); - } - } - - - @Override - public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindCombiningWithContext( - String id, - StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder, - CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) { - throw new UnsupportedOperationException(); - } - - @Override - public WatermarkHoldState bindWatermark( - String id, - StateSpec<WatermarkHoldState> spec, - final TimestampCombiner timestampCombiner) { - try { - BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState( - getKey(), namespace, - kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)), - kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); - - Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn = - new BinaryCombineFn<Instant>() { - @Override - public Instant apply(Instant left, Instant right) { - return timestampCombiner.combine(left, right); - }}; - return new JStormWatermarkHoldState( - namespace, - new JStormCombiningState<>( - accumBagState, - outputTimeCombineFn), - timestampCombiner, - timerService); - } catch (IOException e) { - throw new RuntimeException(); - } - } - }); - } - - private String getStoreId(String stateId) { - return String.format("%s-%s", stateId, executorId); - } + }; + return new JStormWatermarkHoldState( + namespace, + new JStormCombiningState<>( + accumBagState, + outputTimeCombineFn), + timestampCombiner, + timerService); + } catch (IOException e) { + throw new RuntimeException(); + } + } + }); + } + + private String getStoreId(String stateId) { + return String.format("%s-%s", stateId, executorId); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java index 5ad3663..79ff6b4 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,66 +19,64 @@ package org.apache.beam.runners.jstorm.translation.runtime.state; import com.alibaba.jstorm.cache.ComposedKey; import com.alibaba.jstorm.cache.IKvStore; - +import java.io.IOException; +import javax.annotation.Nullable; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.sdk.state.ValueState; -import javax.annotation.Nullable; -import java.io.IOException; - /** * JStorm implementation of {@link ValueState}. */ public class JStormValueState<K, T> implements ValueState<T> { - @Nullable - private final K key; - private final StateNamespace namespace; - private final IKvStore<ComposedKey, T> kvState; + @Nullable + private final K key; + private final StateNamespace namespace; + private final IKvStore<ComposedKey, T> kvState; - JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) { - this.key = key; - this.namespace = namespace; - this.kvState = kvState; - } + JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) { + this.key = key; + this.namespace = namespace; + this.kvState = kvState; + } - @Override - public void write(T t) { - try { - kvState.put(getComposedKey(), t); - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t)); - } + @Override + public void write(T t) { + try { + kvState.put(getComposedKey(), t); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t)); } + } - @Override - public T read() { - try { - return kvState.get(getComposedKey()); - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to read key: %s, namespace: %s.", key, namespace)); - } + @Override + public T read() { + try { + return kvState.get(getComposedKey()); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to read key: %s, namespace: %s.", key, namespace)); } + } - @Override - public ValueState<T> readLater() { - // TODO: support prefetch. - return this; - } + @Override + public ValueState<T> readLater() { + // TODO: support prefetch. + return this; + } - @Override - public void clear() { - try { - kvState.remove(getComposedKey()); - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to clear key: %s, namespace: %s.", key, namespace)); - } + @Override + public void clear() { + try { + kvState.remove(getComposedKey()); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to clear key: %s, namespace: %s.", key, namespace)); } + } - private ComposedKey getComposedKey() { - return ComposedKey.of(key, namespace); - } + private ComposedKey getComposedKey() { + return ComposedKey.of(key, namespace); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java index 659d77c..dc3ba43 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,8 +19,8 @@ package org.apache.beam.runners.jstorm.translation.runtime.state; import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.beam.runners.jstorm.translation.runtime.TimerService; import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.jstorm.translation.runtime.TimerService; import org.apache.beam.sdk.state.GroupingState; import org.apache.beam.sdk.state.ReadableState; import org.apache.beam.sdk.state.WatermarkHoldState; @@ -32,52 +32,52 @@ import org.joda.time.Instant; */ public class JStormWatermarkHoldState implements WatermarkHoldState { - private final StateNamespace namespace; - private final GroupingState<Instant, Instant> watermarkHoldsState; - private final TimestampCombiner timestampCombiner; - private final TimerService timerService; + private final StateNamespace namespace; + private final GroupingState<Instant, Instant> watermarkHoldsState; + private final TimestampCombiner timestampCombiner; + private final TimerService timerService; - JStormWatermarkHoldState( - StateNamespace namespace, - GroupingState<Instant, Instant> watermarkHoldsState, - TimestampCombiner timestampCombiner, - TimerService timerService) { - this.namespace = checkNotNull(namespace, "namespace"); - this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState"); - this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner"); - this.timerService = checkNotNull(timerService, "timerService"); - } + JStormWatermarkHoldState( + StateNamespace namespace, + GroupingState<Instant, Instant> watermarkHoldsState, + TimestampCombiner timestampCombiner, + TimerService timerService) { + this.namespace = checkNotNull(namespace, "namespace"); + this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState"); + this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner"); + this.timerService = checkNotNull(timerService, "timerService"); + } - @Override - public TimestampCombiner getTimestampCombiner() { - return timestampCombiner; - } + @Override + public TimestampCombiner getTimestampCombiner() { + return timestampCombiner; + } - @Override - public void add(Instant instant) { - timerService.addWatermarkHold(namespace.stringKey(), instant); - watermarkHoldsState.add(instant); - } + @Override + public void add(Instant instant) { + timerService.addWatermarkHold(namespace.stringKey(), instant); + watermarkHoldsState.add(instant); + } - @Override - public ReadableState<Boolean> isEmpty() { - return watermarkHoldsState.isEmpty(); - } + @Override + public ReadableState<Boolean> isEmpty() { + return watermarkHoldsState.isEmpty(); + } - @Override - public Instant read() { - return watermarkHoldsState.read(); - } + @Override + public Instant read() { + return watermarkHoldsState.read(); + } - @Override - public WatermarkHoldState readLater() { - // TODO: support prefetch. - return this; - } + @Override + public WatermarkHoldState readLater() { + // TODO: support prefetch. + return this; + } - @Override - public void clear() { - timerService.clearWatermarkHold(namespace.stringKey()); - watermarkHoldsState.clear(); - } + @Override + public void clear() { + timerService.clearWatermarkHold(namespace.stringKey()); + watermarkHoldsState.clear(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java index 4b5f83c..184a957 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,83 +17,84 @@ */ package org.apache.beam.runners.jstorm.translation.runtime.timer; +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; import org.apache.beam.runners.jstorm.translation.runtime.TimerService; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.sdk.state.TimeDomain; import org.joda.time.Instant; -import javax.annotation.Nullable; - -import static com.google.common.base.Preconditions.checkNotNull; - /** * JStorm implementation of {@link TimerInternals}. */ public class JStormTimerInternals<K> implements TimerInternals { - private final K key; - private final DoFnExecutor<?, ?> doFnExecutor; - private final TimerService timerService; - - - public JStormTimerInternals(@Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) { - this.key = key; - this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor"); - this.timerService = checkNotNull(timerService, "timerService"); - } - - @Override - public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { - setTimer(TimerData.of(timerId, namespace, target, timeDomain)); - } - - @Override - @Deprecated - public void setTimer(TimerData timerData) { - timerService.setTimer(key, timerData, doFnExecutor); - } - - @Override - public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { - throw new UnsupportedOperationException( - "Canceling of a timer is not yet supported."); - } - - @Override - @Deprecated - public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException( - "Canceling of a timer is not yet supported."); - } - - @Override - @Deprecated - public void deleteTimer(TimerData timerData) { - throw new UnsupportedOperationException( - "Canceling of a timer is not yet supported."); - } - - @Override - public Instant currentProcessingTime() { - return Instant.now(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return null; - } - - @Override - public Instant currentInputWatermarkTime() { - return new Instant(timerService.currentInputWatermark()); - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - return new Instant(timerService.currentOutputWatermark()); - } + private final K key; + private final DoFnExecutor<?, ?> doFnExecutor; + private final TimerService timerService; + + + public JStormTimerInternals( + @Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) { + this.key = key; + this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor"); + this.timerService = checkNotNull(timerService, "timerService"); + } + + @Override + public void setTimer( + StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { + setTimer(TimerData.of(timerId, namespace, target, timeDomain)); + } + + @Override + @Deprecated + public void setTimer(TimerData timerData) { + timerService.setTimer(key, timerData, doFnExecutor); + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException( + "Canceling of a timer is not yet supported."); + } + + @Override + @Deprecated + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException( + "Canceling of a timer is not yet supported."); + } + + @Override + @Deprecated + public void deleteTimer(TimerData timerData) { + throw new UnsupportedOperationException( + "Canceling of a timer is not yet supported."); + } + + @Override + public Instant currentProcessingTime() { + return Instant.now(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return null; + } + + @Override + public Instant currentInputWatermarkTime() { + return new Instant(timerService.currentInputWatermark()); + } + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { + return new Instant(timerService.currentOutputWatermark()); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java index 9651fc2..7e7a54a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java @@ -17,10 +17,9 @@ */ package org.apache.beam.runners.jstorm.translation.translator; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; - -import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; @@ -33,18 +32,20 @@ import org.apache.beam.sdk.values.TupleTag; */ public class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> { - @Override - public void translateNode(Read.Bounded<T> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + @Override + public void translateNode(Read.Bounded<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = + describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - TupleTag<?> outputTag = userGraphContext.getOutputTag(); - PValue outputValue = userGraphContext.getOutput(); - UnboundedSourceSpout spout = new UnboundedSourceSpout( - description, - new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()), - userGraphContext.getOptions(), outputTag); + TupleTag<?> outputTag = userGraphContext.getOutputTag(); + PValue outputValue = userGraphContext.getOutput(); + UnboundedSourceSpout spout = new UnboundedSourceSpout( + description, + new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()), + userGraphContext.getOptions(), outputTag); - context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(outputTag, outputValue)); - } + context.getExecutionGraphContext().registerSpout( + spout, TaggedPValue.of(outputTag, outputValue)); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java index c4da58a..fe5fca9 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.jstorm.translation.translator; import org.apache.beam.sdk.transforms.Combine; -public class CombineGloballyTranslator<InputT, OutputT> extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> { - +public class CombineGloballyTranslator<InputT, OutputT> + extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> { + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java index 99cbff7..c382fb7 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.jstorm.translation.translator; import org.apache.beam.sdk.transforms.Combine; -public class CombinePerKeyTranslator<K, InputT, OutputT> extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> { - +public class CombinePerKeyTranslator<K, InputT, OutputT> + extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> { + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java index 4558216..bf8d472 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java @@ -18,32 +18,30 @@ package org.apache.beam.runners.jstorm.translation.translator; import com.google.common.collect.Maps; -import org.apache.beam.sdk.transforms.Flatten; - +import java.util.Map; import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.FlattenExecutor; +import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import java.util.Map; - public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> { - @Override - public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + @Override + public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - // Since a new tag is created in PCollectionList, retrieve the real tag here. - Map<TupleTag<?>, PValue> inputs = Maps.newHashMap(); - for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) { - PCollection<V> pc = (PCollection<V>) entry.getValue(); - inputs.putAll(pc.expand()); - } - System.out.println("Real inputs: " + inputs); - System.out.println("FlattenList inputs: " + userGraphContext.getInputs()); - String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); - FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); - context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); + // Since a new tag is created in PCollectionList, retrieve the real tag here. + Map<TupleTag<?>, PValue> inputs = Maps.newHashMap(); + for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) { + PCollection<V> pc = (PCollection<V>) entry.getValue(); + inputs.putAll(pc.expand()); } + System.out.println("Real inputs: " + inputs); + System.out.println("FlattenList inputs: " + userGraphContext.getInputs()); + String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); + FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); + context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java index 6b8297b..85f96ce 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java @@ -17,53 +17,52 @@ */ package org.apache.beam.runners.jstorm.translation.translator; -import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor; import com.google.common.collect.Lists; -import org.apache.beam.sdk.transforms.GroupByKey; - +import java.util.Collections; +import java.util.List; import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; - -import java.util.Collections; -import java.util.List; +import org.apache.beam.sdk.values.WindowingStrategy; public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> { - // information of transform - protected PCollection<KV<K, V>> input; - protected PCollection<KV<K, Iterable<V>>> output; - protected List<TupleTag<?>> inputTags; - protected TupleTag<KV<K, Iterable<V>>> mainOutputTag; - protected List<TupleTag<?>> sideOutputTags; - protected List<PCollectionView<?>> sideInputs; - protected WindowingStrategy<?, ?> windowingStrategy; + // information of transform + protected PCollection<KV<K, V>> input; + protected PCollection<KV<K, Iterable<V>>> output; + protected List<TupleTag<?>> inputTags; + protected TupleTag<KV<K, Iterable<V>>> mainOutputTag; + protected List<TupleTag<?>> sideOutputTags; + protected List<PCollectionView<?>> sideInputs; + protected WindowingStrategy<?, ?> windowingStrategy; - @Override - public void translateNode(GroupByKey<K, V> transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + @Override + public void translateNode(GroupByKey<K, V> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = + describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - input = (PCollection<KV<K, V>>) userGraphContext.getInput(); - output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput(); + input = (PCollection<KV<K, V>>) userGraphContext.getInput(); + output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput(); - inputTags = userGraphContext.getInputTags(); - mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag(); - sideOutputTags = Lists.newArrayList(); + inputTags = userGraphContext.getInputTags(); + mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag(); + sideOutputTags = Lists.newArrayList(); - sideInputs = Collections.<PCollectionView<?>>emptyList(); - windowingStrategy = input.getWindowingStrategy(); + sideInputs = Collections.<PCollectionView<?>>emptyList(); + windowingStrategy = input.getWindowingStrategy(); - GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>( - userGraphContext.getStepName(), - description, - context, - context.getUserGraphContext().getOptions(), - windowingStrategy, - mainOutputTag, - sideOutputTags); - context.addTransformExecutor(groupByWindowExecutor); - } + GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>( + userGraphContext.getStepName(), + description, + context, + context.getUserGraphContext().getOptions(), + windowingStrategy, + mainOutputTag, + sideOutputTags); + context.addTransformExecutor(groupByWindowExecutor); + } }
