http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java new file mode 100644 index 0000000..f101beb --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime.state; + +import com.alibaba.jstorm.cache.IKvStore; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.ReadableState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +public class JStormMapState<K, V> implements MapState<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class); + + private final K key; + private final StateNamespace namespace; + private IKvStore<K, V> kvStore; + + public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) { + this.key = key; + this.namespace = namespace; + this.kvStore = kvStore; + } + + @Override + public void put(K var1, V var2) { + try { + kvStore.put(var1, var2); + } catch (IOException e) { + reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e); + } + } + + @Override + public ReadableState<V> putIfAbsent(K var1, V var2) { + ReadableState<V> ret = null; + try { + V value = kvStore.get(var1); + if (value == null) { + kvStore.put(var1, var2); + ret = new MapReadableState<>(null); + } else { + ret = new MapReadableState<>(value); + } + } catch (IOException e) { + reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e); + } + return ret; + } + + @Override + public void remove(K var1) { + try { + kvStore.remove(var1); + } catch (IOException e) { + reportError(String.format("Failed to remove key=%s", var1), e); + } + } + + @Override + public ReadableState<V> get(K var1) { + ReadableState<V> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState(kvStore.get(var1)); + } catch (IOException e) { + reportError(String.format("Failed to get value for key=%s", var1), e); + } + return ret; + } + + @Override + public ReadableState<Iterable<K>> keys() { + ReadableState<Iterable<K>> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState<>(kvStore.keys()); + } catch (IOException e) { + reportError(String.format("Failed to get keys"), e); + } + return ret; + } + + @Override + public ReadableState<Iterable<V>> values() { + ReadableState<Iterable<V>> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState<>(kvStore.values()); + } catch (IOException e) { + reportError(String.format("Failed to get values"), e); + } + return ret; + } + + @Override + public ReadableState<Iterable<Map.Entry<K, V>>> entries() { + ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null); + try { + ret = new MapReadableState<>(kvStore.entries()); + } catch (IOException e) { + reportError(String.format("Failed to get values"), e); + } + return ret; + } + + @Override + public void clear() { + try { + Iterable<K> keys = kvStore.keys(); + kvStore.removeBatch(keys); + } catch (IOException e) { + reportError(String.format("Failed to clear map state"), e); + } + } + + private void reportError(String errorInfo, IOException e) { + LOG.error(errorInfo, e); + throw new RuntimeException(errorInfo); + } + + private class MapReadableState<T> implements ReadableState<T> { + private T value; + + public MapReadableState(T value) { + this.value = value; + } + + @Override + public T read() { + return value; + } + + @Override + public ReadableState<T> readLater() { + return this; + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java new file mode 100644 index 0000000..8a0cb73 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime.state; + +import org.apache.beam.runners.jstorm.translation.runtime.TimerService; +import com.alibaba.jstorm.cache.ComposedKey; +import com.alibaba.jstorm.cache.IKvStoreManager; + +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.StateBinder; +import org.apache.beam.sdk.state.StateContext; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.joda.time.Instant; + +import javax.annotation.Nullable; +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * JStorm implementation of {@link StateInternals}. + */ +public class JStormStateInternals<K> implements StateInternals { + + private static final String STATE_INFO = "state-info:"; + + @Nullable + private final K key; + private final IKvStoreManager kvStoreManager; + private final TimerService timerService; + private final int executorId; + + public JStormStateInternals(K key, IKvStoreManager kvStoreManager, + TimerService timerService, int executorId) { + this.key = key; + this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager"); + this.timerService = checkNotNull(timerService, "timerService"); + this.executorId = executorId; + } + + @Nullable + @Override + public K getKey() { + return key; + } + + @Override + public <T extends State> T state( + StateNamespace namespace, StateTag<T> address, StateContext<?> c) { + // throw new UnsupportedOperationException("StateContext is not supported."); + /** + * TODOï¼ + * Same implementation as state() which is without StateContext. This might be updated after + * we figure out if we really need StateContext for JStorm state internals. + */ + return state(namespace, address); + } + + @Override + public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) { + return address.getSpec().bind(address.getId(), new StateBinder() { + @Override + public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) { + try { + return new JStormValueState<>( + getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id))); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + @Override + public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) { + try { + return new JStormBagState( + getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)), + kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + @Override + public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) { + throw new UnsupportedOperationException(); + } + + @Override + public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( + String id, + StateSpec<MapState<KeyT, ValueT>> spec, + Coder<KeyT> mapKeyCoder, + Coder<ValueT> mapValueCoder) { + try { + return new JStormMapState<>(getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public <InputT, AccumT, OutputT> CombiningState bindCombining( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + try { + BagState<AccumT> accumBagState = new JStormBagState( + getKey(), namespace, + kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)), + kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); + return new JStormCombiningState<>(accumBagState, combineFn); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + + @Override + public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> + bindCombiningWithContext( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder, + CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) { + throw new UnsupportedOperationException(); + } + + @Override + public WatermarkHoldState bindWatermark( + String id, + StateSpec<WatermarkHoldState> spec, + final TimestampCombiner timestampCombiner) { + try { + BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState( + getKey(), namespace, + kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)), + kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); + + Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn = + new BinaryCombineFn<Instant>() { + @Override + public Instant apply(Instant left, Instant right) { + return timestampCombiner.combine(left, right); + }}; + return new JStormWatermarkHoldState( + namespace, + new JStormCombiningState<>( + accumBagState, + outputTimeCombineFn), + timestampCombiner, + timerService); + } catch (IOException e) { + throw new RuntimeException(); + } + } + }); + } + + private String getStoreId(String stateId) { + return String.format("%s-%s", stateId, executorId); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java new file mode 100644 index 0000000..5ad3663 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime.state; + +import com.alibaba.jstorm.cache.ComposedKey; +import com.alibaba.jstorm.cache.IKvStore; + +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.ValueState; + +import javax.annotation.Nullable; +import java.io.IOException; + +/** + * JStorm implementation of {@link ValueState}. + */ +public class JStormValueState<K, T> implements ValueState<T> { + + @Nullable + private final K key; + private final StateNamespace namespace; + private final IKvStore<ComposedKey, T> kvState; + + JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) { + this.key = key; + this.namespace = namespace; + this.kvState = kvState; + } + + @Override + public void write(T t) { + try { + kvState.put(getComposedKey(), t); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t)); + } + } + + @Override + public T read() { + try { + return kvState.get(getComposedKey()); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to read key: %s, namespace: %s.", key, namespace)); + } + } + + @Override + public ValueState<T> readLater() { + // TODO: support prefetch. + return this; + } + + @Override + public void clear() { + try { + kvState.remove(getComposedKey()); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to clear key: %s, namespace: %s.", key, namespace)); + } + } + + private ComposedKey getComposedKey() { + return ComposedKey.of(key, namespace); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java new file mode 100644 index 0000000..659d77c --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime.state; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.runners.jstorm.translation.runtime.TimerService; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.GroupingState; +import org.apache.beam.sdk.state.ReadableState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.joda.time.Instant; + +/** + * JStorm implementation of {@link WatermarkHoldState}. + */ +public class JStormWatermarkHoldState implements WatermarkHoldState { + + private final StateNamespace namespace; + private final GroupingState<Instant, Instant> watermarkHoldsState; + private final TimestampCombiner timestampCombiner; + private final TimerService timerService; + + JStormWatermarkHoldState( + StateNamespace namespace, + GroupingState<Instant, Instant> watermarkHoldsState, + TimestampCombiner timestampCombiner, + TimerService timerService) { + this.namespace = checkNotNull(namespace, "namespace"); + this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState"); + this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner"); + this.timerService = checkNotNull(timerService, "timerService"); + } + + @Override + public TimestampCombiner getTimestampCombiner() { + return timestampCombiner; + } + + @Override + public void add(Instant instant) { + timerService.addWatermarkHold(namespace.stringKey(), instant); + watermarkHoldsState.add(instant); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return watermarkHoldsState.isEmpty(); + } + + @Override + public Instant read() { + return watermarkHoldsState.read(); + } + + @Override + public WatermarkHoldState readLater() { + // TODO: support prefetch. + return this; + } + + @Override + public void clear() { + timerService.clearWatermarkHold(namespace.stringKey()); + watermarkHoldsState.clear(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java new file mode 100644 index 0000000..4b5f83c --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime.timer; + +import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; +import org.apache.beam.runners.jstorm.translation.runtime.TimerService; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.TimeDomain; +import org.joda.time.Instant; + +import javax.annotation.Nullable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * JStorm implementation of {@link TimerInternals}. + */ +public class JStormTimerInternals<K> implements TimerInternals { + + private final K key; + private final DoFnExecutor<?, ?> doFnExecutor; + private final TimerService timerService; + + + public JStormTimerInternals(@Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) { + this.key = key; + this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor"); + this.timerService = checkNotNull(timerService, "timerService"); + } + + @Override + public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { + setTimer(TimerData.of(timerId, namespace, target, timeDomain)); + } + + @Override + @Deprecated + public void setTimer(TimerData timerData) { + timerService.setTimer(key, timerData, doFnExecutor); + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException( + "Canceling of a timer is not yet supported."); + } + + @Override + @Deprecated + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException( + "Canceling of a timer is not yet supported."); + } + + @Override + @Deprecated + public void deleteTimer(TimerData timerData) { + throw new UnsupportedOperationException( + "Canceling of a timer is not yet supported."); + } + + @Override + public Instant currentProcessingTime() { + return Instant.now(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return null; + } + + @Override + public Instant currentInputWatermarkTime() { + return new Instant(timerService.currentInputWatermark()); + } + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { + return new Instant(timerService.currentOutputWatermark()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java new file mode 100644 index 0000000..9651fc2 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; + +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Translates a {@link Read.Bounded} into a Storm spout. + * + * @param <T> + */ +public class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> { + + @Override + public void translateNode(Read.Bounded<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + + TupleTag<?> outputTag = userGraphContext.getOutputTag(); + PValue outputValue = userGraphContext.getOutput(); + UnboundedSourceSpout spout = new UnboundedSourceSpout( + description, + new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()), + userGraphContext.getOptions(), outputTag); + + context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(outputTag, outputValue)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java new file mode 100644 index 0000000..c4da58a --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.sdk.transforms.Combine; + +public class CombineGloballyTranslator<InputT, OutputT> extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java new file mode 100644 index 0000000..99cbff7 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.sdk.transforms.Combine; + +public class CombinePerKeyTranslator<K, InputT, OutputT> extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java new file mode 100644 index 0000000..4558216 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import com.google.common.collect.Maps; +import org.apache.beam.sdk.transforms.Flatten; + +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.runners.jstorm.translation.runtime.FlattenExecutor; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; + +import java.util.Map; + +public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> { + + @Override + public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + + // Since a new tag is created in PCollectionList, retrieve the real tag here. + Map<TupleTag<?>, PValue> inputs = Maps.newHashMap(); + for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) { + PCollection<V> pc = (PCollection<V>) entry.getValue(); + inputs.putAll(pc.expand()); + } + System.out.println("Real inputs: " + inputs); + System.out.println("FlattenList inputs: " + userGraphContext.getInputs()); + String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); + FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); + context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java new file mode 100644 index 0000000..6b8297b --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor; +import com.google.common.collect.Lists; +import org.apache.beam.sdk.transforms.GroupByKey; + +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +import java.util.Collections; +import java.util.List; + +public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> { + // information of transform + protected PCollection<KV<K, V>> input; + protected PCollection<KV<K, Iterable<V>>> output; + protected List<TupleTag<?>> inputTags; + protected TupleTag<KV<K, Iterable<V>>> mainOutputTag; + protected List<TupleTag<?>> sideOutputTags; + protected List<PCollectionView<?>> sideInputs; + protected WindowingStrategy<?, ?> windowingStrategy; + + @Override + public void translateNode(GroupByKey<K, V> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + + input = (PCollection<KV<K, V>>) userGraphContext.getInput(); + output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput(); + + inputTags = userGraphContext.getInputTags(); + mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag(); + sideOutputTags = Lists.newArrayList(); + + sideInputs = Collections.<PCollectionView<?>>emptyList(); + windowingStrategy = input.getWindowingStrategy(); + + GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>( + userGraphContext.getStepName(), + description, + context, + context.getUserGraphContext().getOptions(), + windowingStrategy, + mainOutputTag, + sideOutputTags); + context.addTransformExecutor(groupByWindowExecutor); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java new file mode 100644 index 0000000..c487578 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import avro.shaded.com.google.common.collect.Maps; +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; +import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor; +import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}. + */ +public class ParDoBoundMultiTranslator<InputT, OutputT> + extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> { + + @Override + public void translateNode(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) { + final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag(); + PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); + + Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); + Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap(); + for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) { + Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); + localToExternalTupleTagMap.put(entry.getKey(), itr.next()); + } + + TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); + List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags(); + sideOutputTags.remove(mainOutputTag); + + Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs()); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + String description = describeTransform( + transform, + allInputs, + allOutputs); + + ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + + DoFnExecutor executor; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + executor = new MultiStatefulDoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + (DoFn<KV, OutputT>) transform.getFn(), + (Coder) WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + (TupleTag<KV>) inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags, + localToExternalTupleTagMap); + } else { + executor = new MultiOutputDoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + transform.getFn(), + WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags, + localToExternalTupleTagMap); + } + + context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java new file mode 100644 index 0000000..3a952a9 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import java.util.List; +import java.util.Map; + +import avro.shaded.com.google.common.collect.Lists; +import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; + +import org.apache.beam.runners.jstorm.translation.TranslationContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}. + */ +public class ParDoBoundTranslator<InputT, OutputT> + extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> { + + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class); + + @Override + public void translateNode(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) { + final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + final TupleTag<?> inputTag = userGraphContext.getInputTag(); + PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); + + TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); + List<TupleTag<?>> sideOutputTags = Lists.newArrayList(); + + Map<TupleTag<?>, PValue> allInputs = avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs()); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + String description = describeTransform( + transform, + allInputs, + userGraphContext.getOutputs()); + + ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); + for (PCollectionView pCollectionView : transform.getSideInputs()) { + sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); + } + + DoFnExecutor executor; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + executor = new StatefulDoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + (DoFn<KV, OutputT>) transform.getFn(), + (Coder) WindowedValue.getFullCoder( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + (TupleTag<KV>) inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags); + } else { + executor = new DoFnExecutor<>( + userGraphContext.getStepName(), + description, + userGraphContext.getOptions(), + transform.getFn(), + WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), + input.getWindowingStrategy(), + (TupleTag<InputT>) inputTag, + transform.getSideInputs(), + sideInputTagToView.build(), + mainOutputTag, + sideOutputTags); + } + + context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java new file mode 100644 index 0000000..1ef1ec3 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.sdk.transforms.Reshuffle; + +public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K,V>> { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java new file mode 100644 index 0000000..9f69391 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import com.google.auto.value.AutoValue; + +import javax.annotation.Nullable; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Class that defines the stream connection between upstream and downstream components. + */ +@AutoValue +public abstract class Stream { + + public abstract Producer getProducer(); + public abstract Consumer getConsumer(); + + public static Stream of(Producer producer, Consumer consumer) { + return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream(producer, consumer); + } + + @AutoValue + public abstract static class Producer { + public abstract String getComponentId(); + public abstract String getStreamId(); + public abstract String getStreamName(); + + public static Producer of(String componentId, String streamId, String streamName) { + return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Producer( + componentId, streamId, streamName); + } + } + + @AutoValue + public abstract static class Consumer { + public abstract String getComponentId(); + public abstract Grouping getGrouping(); + + public static Consumer of(String componentId, Grouping grouping) { + return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Consumer( + componentId, grouping); + } + } + + @AutoValue + public abstract static class Grouping { + public abstract Type getType(); + + @Nullable + public abstract List<String> getFields(); + + public static Grouping of(Type type) { + checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields."); + return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping( + type, null /* fields */); + } + + public static Grouping byFields(List<String> fields) { + checkNotNull(fields, "fields"); + checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!"); + return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping( + Type.FIELDS, fields); + } + + /** + * Types of stream groupings Storm allows + */ + public enum Type { + ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java new file mode 100644 index 0000000..bebdf7b --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.FluentIterable; +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.sdk.transforms.PTransform; + +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; + +import java.util.Map; + +/** + * Interface for classes capable of tranforming Beam PTransforms into Storm primitives. + */ +public interface TransformTranslator<T extends PTransform<?, ?>> { + + void translateNode(T transform, TranslationContext context); + + /** + * Returns true if this translator can translate the given transform. + */ + boolean canTranslate(T transform, TranslationContext context); + + class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> { + @Override + public void translateNode(T1 transform, TranslationContext context) { + + } + + @Override + public boolean canTranslate(T1 transform, TranslationContext context) { + return true; + } + + static String describeTransform( + PTransform<?, ?> transform, + Map<TupleTag<?>, PValue> inputs, + Map<TupleTag<?>, PValue> outputs) { + return String.format("%s --> %s --> %s", + Joiner.on('+').join(FluentIterable.from(inputs.entrySet()) + .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { + @Override + public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) { + return taggedPValue.getKey().getId(); + // return taggedPValue.getValue().getName(); + }})), + transform.getName(), + Joiner.on('+').join(FluentIterable.from(outputs.entrySet()) + .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { + @Override + public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) { + return taggedPvalue.getKey().getId(); + //return taggedPValue.getValue().getName(); + }}))); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java new file mode 100644 index 0000000..ac7d7bd --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; + +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; + +/** + * Translates a Read.Unbounded into a Storm spout. + * + * @param <T> + */ +public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> { + public void translateNode(Read.Unbounded<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + + TupleTag<?> tag = userGraphContext.getOutputTag(); + PValue output = userGraphContext.getOutput(); + + UnboundedSourceSpout spout = new UnboundedSourceSpout( + description, + transform.getSource(), userGraphContext.getOptions(), tag); + context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java new file mode 100644 index 0000000..0ebf837 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.runners.jstorm.translation.runtime.ViewExecutor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner. + */ +public class ViewTranslator extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> { + @Override + public void translateNode(CreateJStormPCollectionView<?, ?> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag()); + context.addTransformExecutor(viewExecutor); + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} + * for the Flink runner in streaming mode. + */ + public static class ViewAsMap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + + @SuppressWarnings("unused") // used via reflection in JstormRunner#apply() + public ViewAsMap(View.AsMap<K, V> transform) { + } + + @Override + public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, V>> view = + PCollectionViews.mapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + // TODO: log warning as other runners. + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMap"; + } + } + + /** + * Specialized expansion for {@link + * View.AsMultimap View.AsMultimap} for the + * Flink runner in streaming mode. + */ + public static class ViewAsMultimap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsMultimap(View.AsMultimap<K, V> transform) { + } + + @Override + public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, Iterable<V>>> view = + PCollectionViews.multimapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + // TODO: log warning as other runners. + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMultimap"; + } + } + + /** + * Specialized implementation for + * {@link View.AsList View.AsList} for the + * JStorm runner in streaming mode. + */ + public static class ViewAsList<T> + extends PTransform<PCollection<T>, PCollectionView<List<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsList(View.AsList<T> transform) {} + + @Override + public PCollectionView<List<T>> expand(PCollection<T> input) { + PCollectionView<List<T>> view = + PCollectionViews.listView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<T, List<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsList"; + } + } + + /** + * Specialized implementation for + * {@link View.AsIterable View.AsIterable} for the + * JStorm runner in streaming mode. + */ + public static class ViewAsIterable<T> + extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsIterable(View.AsIterable<T> transform) { } + + @Override + public PCollectionView<Iterable<T>> expand(PCollection<T> input) { + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsIterable"; + } + } + + /** + * Specialized expansion for + * {@link View.AsSingleton View.AsSingleton} for the + * JStorm runner in streaming mode. + */ + public static class ViewAsSingleton<T> + extends PTransform<PCollection<T>, PCollectionView<T>> { + private View.AsSingleton<T> transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in JStormRunner#apply() + public ViewAsSingleton(View.AsSingleton<T> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<T> expand(PCollection<T> input) { + Combine.Globally<T, T> combine = Combine.globally( + new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); + if (!transform.hasDefaultValue()) { + combine = combine.withoutDefaults(); + } + return input.apply(combine.asSingletonView()); + } + + @Override + protected String getKindString() { + return "StreamingViewAsSingleton"; + } + + private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { + private boolean hasDefaultValue; + private T defaultValue; + + SingletonCombine(boolean hasDefaultValue, T defaultValue) { + this.hasDefaultValue = hasDefaultValue; + this.defaultValue = defaultValue; + } + + @Override + public T apply(T left, T right) { + throw new IllegalArgumentException("PCollection with more than one element " + + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " + + "combine the PCollection into a single value"); + } + + @Override + public T identity() { + if (hasDefaultValue) { + return defaultValue; + } else { + throw new IllegalArgumentException( + "Empty PCollection accessed as a singleton view. " + + "Consider setting withDefault to provide a default value"); + } + } + } + } + + public static class CombineGloballyAsSingletonView<InputT, OutputT> + extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { + Combine.GloballyAsSingletonView<InputT, OutputT> transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public CombineGloballyAsSingletonView( + Combine.GloballyAsSingletonView<InputT, OutputT> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<OutputT> expand(PCollection<InputT> input) { + PCollection<OutputT> combined = + input.apply(Combine.globally(transform.getCombineFn()) + .withoutDefaults() + .withFanout(transform.getFanout())); + + PCollectionView<OutputT> view = PCollectionViews.singletonView( + combined, + combined.getWindowingStrategy(), + transform.getInsertDefault(), + transform.getInsertDefault() + ? transform.getCombineFn().defaultValue() : null, + combined.getCoder()); + return combined + .apply(ParDo.of(new WrapAsList<OutputT>())) + .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingCombineGloballyAsSingletonView"; + } + } + + private static class WrapAsList<T> extends DoFn<T, List<T>> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(Collections.singletonList(c.element())); + } + } + + /** + * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. + * They require the input {@link PCollection} fits in memory. + * For a large {@link PCollection} this is expected to crash! + * + * @param <T> the type of elements to concatenate. + */ + private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { + private static final long serialVersionUID = 1L; + + @Override + public List<T> createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List<T> addInput(List<T> accumulator, T input) { + accumulator.add(input); + return accumulator; + } + + @Override + public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { + List<T> result = createAccumulator(); + for (List<T> accumulator : accumulators) { + result.addAll(accumulator); + } + return result; + } + + @Override + public List<T> extractOutput(List<T> accumulator) { + return accumulator; + } + + @Override + public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + + @Override + public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + } + + /** + * Creates a primitive {@link PCollectionView}. + * + * <p>For internal use only by runner implementors. + * + * @param <ElemT> The type of the elements of the input PCollection + * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input + */ + public static class CreateJStormPCollectionView<ElemT, ViewT> + extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { + private PCollectionView<ViewT> view; + + private CreateJStormPCollectionView(PCollectionView<ViewT> view) { + this.view = view; + } + + public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of( + PCollectionView<ViewT> view) { + return new CreateJStormPCollectionView<>(view); + } + + @Override + public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { + return view; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java new file mode 100644 index 0000000..0bf9a49 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor; +import org.apache.beam.sdk.transforms.windowing.Window; + +import org.apache.beam.runners.jstorm.translation.TranslationContext; + +public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { + + @Override + public void translateNode(Window.Assign<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); + context.getUserGraphContext().setWindowed(); + WindowAssignExecutor executor = new WindowAssignExecutor( + description, + transform.getWindowFn(), + userGraphContext.getOutputTag()); + context.addTransformExecutor(executor); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java new file mode 100644 index 0000000..b67aff9 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Translates a Window.Bound node into a Storm WindowedBolt + * + * @param <T> + */ +public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { + private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class); + + // Do nothing here currently. The assign of window strategy is included in AssignTranslator. + @Override + public void translateNode(Window.Assign<T> transform, TranslationContext context) { + if (transform.getWindowFn() instanceof FixedWindows) { + context.getUserGraphContext().setWindowed(); + } else if (transform.getWindowFn() instanceof SlidingWindows) { + context.getUserGraphContext().setWindowed(); + } else { + throw new UnsupportedOperationException("Not supported window type currently: " + transform.getWindowFn()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java new file mode 100644 index 0000000..07a3ad5 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.util; + +public class CommonInstance { + public static final String KEY = "Key"; + public static final String VALUE = "Value"; + + public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java new file mode 100644 index 0000000..87562fd --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; + +import javax.annotation.Nullable; +import java.io.Serializable; + +/** + * No-op SideInputReader implementation. + */ +public class DefaultSideInputReader implements SideInputReader, Serializable { + @Nullable + @Override + public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) { + return null; + } + + @Override + public <T> boolean contains(PCollectionView<T> pCollectionView) { + return false; + } + + @Override + public boolean isEmpty() { + return true; + } +}
