jstorm-runner: Fix the failure of session window test cases in CombineTest
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52913b7e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52913b7e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52913b7e Branch: refs/heads/jstorm-runner Commit: 52913b7e2b01b4e6c65d96a10d745dd3e6739c83 Parents: 201ef72 Author: basti.lj <[email protected]> Authored: Thu Jul 20 14:37:29 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:59 2017 +0800 ---------------------------------------------------------------------- .../jstorm/translation/FlattenTranslator.java | 1 - .../translation/JStormStateInternals.java | 188 +++++++++++++++++-- 2 files changed, 176 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/52913b7e/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java index 8f239bf..e104ad8 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/52913b7e/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java index 68a17e5..90ef6d2 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateBinder; import org.apache.beam.sdk.state.StateContext; import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.Combine; @@ -93,13 +94,14 @@ class JStormStateInternals<K> implements StateInternals { } @Override - public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) { + public <T extends State> T state(final StateNamespace namespace, final StateTag<T> address) { return address.getSpec().bind(address.getId(), new StateBinder() { @Override public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) { try { return new JStormValueState<>( - getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id))); + getStoreId(id), spec, getKey(), namespace, + kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id))); } catch (IOException e) { throw new RuntimeException(); } @@ -109,7 +111,8 @@ class JStormStateInternals<K> implements StateInternals { public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) { try { return new JStormBagState( - getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)), + getStoreId(id), spec, getKey(), namespace, + kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)), kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); } catch (IOException e) { throw new RuntimeException(); @@ -129,7 +132,8 @@ class JStormStateInternals<K> implements StateInternals { Coder<ValueT> mapValueCoder) { try { return new JStormMapState<>( - getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id))); + getStoreId(id), spec, (KeyT) getKey(), namespace, + kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id))); } catch (IOException e) { throw new RuntimeException(e); } @@ -143,10 +147,11 @@ class JStormStateInternals<K> implements StateInternals { Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { try { BagState<AccumT> accumBagState = new JStormBagState( - getKey(), namespace, + getStoreId(id), StateSpecs.<InputT>bag(), getKey(), namespace, kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)), kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); - return new JStormCombiningState<>(accumBagState, combineFn); + return new JStormCombiningState<>( + id, spec, namespace, accumBagState, combineFn); } catch (IOException e) { throw new RuntimeException(); } @@ -169,7 +174,7 @@ class JStormStateInternals<K> implements StateInternals { final TimestampCombiner timestampCombiner) { try { BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState( - getKey(), namespace, + getStoreId(id), StateSpecs.<Combine.Holder<Instant>>bag(), getKey(), namespace, kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)), kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); @@ -181,8 +186,11 @@ class JStormStateInternals<K> implements StateInternals { } }; return new JStormWatermarkHoldState( - namespace, + id, spec, namespace, new JStormCombiningState<>( + getStoreId(id), + StateSpecs.combining(outputTimeCombineFn), + namespace, accumBagState, outputTimeCombineFn), timestampCombiner, @@ -199,12 +207,21 @@ class JStormStateInternals<K> implements StateInternals { */ private static class JStormValueState<K, T> implements ValueState<T> { + private final String id; + private final StateSpec<ValueState<T>> spec; @Nullable private final K key; private final StateNamespace namespace; private final IKvStore<ComposedKey, T> kvState; - JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) { + JStormValueState( + String id, + StateSpec<ValueState<T>> spec, + @Nullable K key, + StateNamespace namespace, + IKvStore<ComposedKey, T> kvState) { + this.id = id; + this.spec = spec; this.key = key; this.namespace = namespace; this.kvState = kvState; @@ -246,6 +263,29 @@ class JStormStateInternals<K> implements StateInternals { } } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JStormValueState<?, ?> that = (JStormValueState<?, ?>) o; + + return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + id.hashCode(); + result = 31 * result + spec.hashCode(); + return result; + } + private ComposedKey getComposedKey() { return ComposedKey.of(key, namespace); } @@ -256,6 +296,8 @@ class JStormStateInternals<K> implements StateInternals { */ private static class JStormBagState<K, T> implements BagState<T> { + private final String id; + private final StateSpec<BagState<T>> spec; @Nullable private final K key; private final StateNamespace namespace; @@ -263,10 +305,14 @@ class JStormStateInternals<K> implements StateInternals { private final IKvStore<ComposedKey, Object> stateInfoKvState; JStormBagState( + String id, + StateSpec<BagState<T>> spec, @Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState, IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException { + this.id = id; + this.spec = spec; this.key = key; this.namespace = checkNotNull(namespace, "namespace"); this.kvState = checkNotNull(kvState, "kvState"); @@ -350,8 +396,31 @@ class JStormStateInternals<K> implements StateInternals { } catch (IOException e) { } - return String.format("JStormBagState: key=%s, namespace=%s, elementIndex=%d", - key, namespace, elemIndex); + return String.format("stateId=%s, key=%s, namespace=%s, elementIndex=%d", + id, key, namespace, elemIndex); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JStormBagState<?, ?> that = (JStormBagState<?, ?>) o; + + return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + id.hashCode(); + result = 31 * result + spec.hashCode(); + return result; } /** @@ -420,13 +489,22 @@ class JStormStateInternals<K> implements StateInternals { private static class JStormCombiningState<InputT, AccumT, OutputT> implements CombiningState<InputT, AccumT, OutputT> { + private final String id; + private final StateSpec<CombiningState<InputT, AccumT, OutputT>> spec; + private final StateNamespace namespace; @Nullable private final BagState<AccumT> accumBagState; private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; JStormCombiningState( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, + StateNamespace namespace, BagState<AccumT> accumBagState, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + this.id = id; + this.spec = spec; + this.namespace = namespace; this.accumBagState = checkNotNull(accumBagState, "accumBagState"); this.combineFn = checkNotNull(combineFn, "combineFn"); } @@ -474,6 +552,29 @@ class JStormStateInternals<K> implements StateInternals { public void clear() { accumBagState.clear(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JStormCombiningState<?, ?, ?> that = (JStormCombiningState<?, ?, ?>) o; + + return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + id.hashCode(); + result = 31 * result + spec.hashCode(); + return result; + } } /** @@ -483,11 +584,19 @@ class JStormStateInternals<K> implements StateInternals { */ private static class JStormMapState<K, V> implements MapState<K, V> { + private final String id; + private final StateSpec<MapState<K, V>> spec; private final K key; private final StateNamespace namespace; private IKvStore<K, V> kvStore; - JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) { + JStormMapState( + String id, + StateSpec<MapState<K, V>> spec, + K key, + StateNamespace namespace, IKvStore<K, V> kvStore) { + this.id = id; + this.spec = spec; this.key = key; this.namespace = namespace; this.kvStore = kvStore; @@ -582,6 +691,29 @@ class JStormStateInternals<K> implements StateInternals { } } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JStormMapState<?, ?> that = (JStormMapState<?, ?>) o; + + return namespace.equals(that.namespace) && id.equals(that.id) && spec.equals(that.spec); + + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + id.hashCode(); + result = 31 * result + spec.hashCode(); + return result; + } + private void reportError(String errorInfo, IOException e) { LOG.error(errorInfo, e); throw new RuntimeException(errorInfo); @@ -611,16 +743,22 @@ class JStormStateInternals<K> implements StateInternals { */ private static class JStormWatermarkHoldState implements WatermarkHoldState { + private final String id; + private final StateSpec<WatermarkHoldState> spec; private final StateNamespace namespace; private final GroupingState<Instant, Instant> watermarkHoldsState; private final TimestampCombiner timestampCombiner; private final TimerService timerService; JStormWatermarkHoldState( + String id, + StateSpec<WatermarkHoldState> spec, StateNamespace namespace, GroupingState<Instant, Instant> watermarkHoldsState, TimestampCombiner timestampCombiner, TimerService timerService) { + this.id = checkNotNull(id, "id"); + this.spec = checkNotNull(spec, "spec"); this.namespace = checkNotNull(namespace, "namespace"); this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState"); this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner"); @@ -659,6 +797,32 @@ class JStormStateInternals<K> implements StateInternals { timerService.clearWatermarkHold(namespace.stringKey()); watermarkHoldsState.clear(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JStormWatermarkHoldState that = (JStormWatermarkHoldState) o; + + return namespace.equals(that.namespace) + && id.equals(that.id) + && spec.equals(that.spec) + && timestampCombiner.equals(that.timestampCombiner); + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + id.hashCode(); + result = 31 * result + spec.hashCode(); + result = 31 * result + timestampCombiner.hashCode(); + return result; + } } private String getStoreId(String stateId) {
