http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java deleted file mode 100644 index 615ac8b..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.serialization; - -import backtype.storm.Config; -import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; -import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; - -/** - * Specific serializer of {@link Kryo} for Unmodifiable Collection. - */ -public class UnmodifiableCollectionsSerializer extends Serializer<Object> { - - private static final Field SOURCE_COLLECTION_FIELD; - private static final Field SOURCE_MAP_FIELD; - - static { - try { - SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection") - .getDeclaredField("c"); - SOURCE_COLLECTION_FIELD.setAccessible(true); - - - SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap") - .getDeclaredField("m"); - SOURCE_MAP_FIELD.setAccessible(true); - } catch (final Exception e) { - throw new RuntimeException("Could not access source collection" - + " field in java.util.Collections$UnmodifiableCollection.", e); - } - } - - @Override - public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) { - final int ordinal = input.readInt(true); - final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; - final Object sourceCollection = kryo.readClassAndObject(input); - return unmodifiableCollection.create(sourceCollection); - } - - @Override - public void write(final Kryo kryo, final Output output, final Object object) { - try { - final UnmodifiableCollection unmodifiableCollection = - UnmodifiableCollection.valueOfType(object.getClass()); - // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") - output.writeInt(unmodifiableCollection.ordinal(), true); - kryo.writeClassAndObject(output, unmodifiableCollection.sourceCollectionField.get(object)); - } catch (final RuntimeException e) { - // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... - // handles SerializationException specifically (resizing the buffer)... - throw e; - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public Object copy(Kryo kryo, Object original) { - try { - final UnmodifiableCollection unmodifiableCollection = - UnmodifiableCollection.valueOfType(original.getClass()); - Object sourceCollectionCopy = - kryo.copy(unmodifiableCollection.sourceCollectionField.get(original)); - return unmodifiableCollection.create(sourceCollectionCopy); - } catch (final RuntimeException e) { - // Don't eat and wrap RuntimeExceptions - throw e; - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - private enum UnmodifiableCollection { - COLLECTION( - Collections.unmodifiableCollection(Arrays.asList("")).getClass(), - SOURCE_COLLECTION_FIELD) { - @Override - public Object create(final Object sourceCollection) { - return Collections.unmodifiableCollection((Collection<?>) sourceCollection); - } - }, - RANDOM_ACCESS_LIST( - Collections.unmodifiableList(new ArrayList<Void>()).getClass(), - SOURCE_COLLECTION_FIELD) { - @Override - public Object create(final Object sourceCollection) { - return Collections.unmodifiableList((List<?>) sourceCollection); - } - }, - LIST(Collections.unmodifiableList(new LinkedList<Void>()).getClass(), SOURCE_COLLECTION_FIELD) { - @Override - public Object create(final Object sourceCollection) { - return Collections.unmodifiableList((List<?>) sourceCollection); - } - }, - SET(Collections.unmodifiableSet(new HashSet<Void>()).getClass(), SOURCE_COLLECTION_FIELD) { - @Override - public Object create(final Object sourceCollection) { - return Collections.unmodifiableSet((Set<?>) sourceCollection); - } - }, - SORTED_SET( - Collections.unmodifiableSortedSet(new TreeSet<Void>()).getClass(), - SOURCE_COLLECTION_FIELD) { - @Override - public Object create(final Object sourceCollection) { - return Collections.unmodifiableSortedSet((SortedSet<?>) sourceCollection); - } - }, - MAP(Collections.unmodifiableMap(new HashMap<Void, Void>()).getClass(), SOURCE_MAP_FIELD) { - @Override - public Object create(final Object sourceCollection) { - return Collections.unmodifiableMap((Map<?, ?>) sourceCollection); - } - - }, - SORTED_MAP( - Collections.unmodifiableSortedMap(new TreeMap<Void, Void>()).getClass(), - SOURCE_MAP_FIELD) { - @Override - public Object create(final Object sourceCollection) { - return Collections.unmodifiableSortedMap((SortedMap<?, ?>) sourceCollection); - } - }; - - private final Class<?> type; - private final Field sourceCollectionField; - - private UnmodifiableCollection(final Class<?> type, final Field sourceCollectionField) { - this.type = type; - this.sourceCollectionField = sourceCollectionField; - } - - /** - * @param sourceCollection - */ - public abstract Object create(Object sourceCollection); - - static UnmodifiableCollection valueOfType(final Class<?> type) { - for (final UnmodifiableCollection item : values()) { - if (item.type.equals(type)) { - return item; - } - } - throw new IllegalArgumentException("The type " + type + " is not supported."); - } - - } - - /** - * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer - * for the several unmodifiable Collections that can be created via {@link Collections}, - * including {@link Map}s. - * - * @see Collections#unmodifiableCollection(Collection) - * @see Collections#unmodifiableList(List) - * @see Collections#unmodifiableSet(Set) - * @see Collections#unmodifiableSortedSet(SortedSet) - * @see Collections#unmodifiableMap(Map) - * @see Collections#unmodifiableSortedMap(SortedMap) - */ - public static void registerSerializers(Config config) { - UnmodifiableCollection.values(); - for (final UnmodifiableCollection item : UnmodifiableCollection.values()) { - config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class); - } - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java index f64193e..53555c9 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java @@ -39,6 +39,7 @@ class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounde TupleTag<?> outputTag = userGraphContext.getOutputTag(); PValue outputValue = userGraphContext.getOutput(); UnboundedSourceSpout spout = new UnboundedSourceSpout( + userGraphContext.getStepName(), description, new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()), userGraphContext.getOptions(), outputTag); http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java index 6baa944..2148f34 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -277,7 +277,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor { pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); if (pushedBackElements != null) { for (WindowedValue<InputT> elem : pushedBackElements.read()) { - LOG.info("Process pushback elem={}", elem); + LOG.debug("Process pushed back elem: {}", elem); runner.processElement(elem); } pushedBackElements.clear(); http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java index 145b224..8812988 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.values.TupleTag; /** * An executor is a basic executable unit in a JStorm task. */ -interface Executor extends Serializable { +public interface Executor extends Serializable { /** * Initialization during runtime. */ http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index 33393f2..f8e09be 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -36,6 +36,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -63,6 +64,8 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { // map from input tag to executor inside bolt protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap(); + protected final Map<Executor, Collection<TupleTag>> executorToOutputTags = Maps.newHashMap(); + protected final Map<Executor, String> executorToPTransformName = Maps.newHashMap(); // set of all output tags that will be emit outside bolt protected final Set<TupleTag> outputTags = Sets.newHashSet(); protected final Set<TupleTag> externalOutputTags = Sets.newHashSet(); @@ -84,16 +87,21 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { isStatefulBolt = isStateful; } - public void addExecutor(TupleTag inputTag, Executor executor) { + public void addExecutor(TupleTag inputTag, Executor executor, String name) { inputTagToExecutor.put( checkNotNull(inputTag, "inputTag"), checkNotNull(executor, "executor")); + executorToPTransformName.put(executor, name); } public Map<TupleTag, Executor> getExecutors() { return inputTagToExecutor; } + public Map<Executor, String> getExecutorNames() { + return executorToPTransformName; + } + public void registerExecutor(Executor executor) { if (executor instanceof DoFnExecutor) { DoFnExecutor doFnExecutor = (DoFnExecutor) executor; @@ -107,14 +115,31 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { return idToDoFnExecutor; } - public void addOutputTags(TupleTag tag) { - outputTags.add(tag); + public void addOutputTags(Executor executor, TupleTag outputTag) { + Collection<TupleTag> outTags; + if (executorToOutputTags.containsKey(executor)) { + outTags = executorToOutputTags.get(executor); + } else { + outTags = Sets.newHashSet(); + executorToOutputTags.put(executor, outTags); + } + outTags.add(outputTag); + + outputTags.add(outputTag); + } + + public Map<Executor, Collection<TupleTag>> getExecutorToOutputTags() { + return executorToOutputTags; } public void addExternalOutputTag(TupleTag<?> tag) { externalOutputTags.add(tag); } + public Collection<TupleTag> getExternalOutputTags() { + return externalOutputTags; + } + public Set<TupleTag> getOutputTags() { return outputTags; } @@ -328,6 +353,10 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { for (Executor executor : inputTagToExecutor.values()) { ret.add(executor.toString()); } + ret.add("outputTags"); + for (TupleTag outputTag : outputTags) { + ret.add(outputTag.getId()); + } ret.add("externalOutputTags"); for (TupleTag output : externalOutputTags) { ret.add(output.getId()); http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java index b96bc56..ebe8bc3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java @@ -70,6 +70,7 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti PValue output = userGraphContext.getOutput(); UnboundedSourceSpout spout = new UnboundedSourceSpout( + userGraphContext.getStepName(), description, new EmptySource(), userGraphContext.getOptions(), http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java index 90ef6d2..292b771 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; import javax.annotation.Nullable; + import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; @@ -328,8 +329,8 @@ class JStormStateInternals<K> implements StateInternals { public void add(T input) { try { int elemIndex = getElementIndex(); + stateInfoKvState.put(getComposedKey(), elemIndex + 1); kvState.put(getComposedKey(elemIndex), input); - stateInfoKvState.put(getComposedKey(), ++elemIndex); } catch (IOException e) { throw new RuntimeException(e.getCause()); } @@ -381,23 +382,11 @@ class JStormStateInternals<K> implements StateInternals { } private ComposedKey getComposedKey() { - return ComposedKey.of(key, namespace); + return ComposedKey.of(id, key, namespace); } private ComposedKey getComposedKey(int elemIndex) { - return ComposedKey.of(key, namespace, elemIndex); - } - - @Override - public String toString() { - int elemIndex = -1; - try { - elemIndex = getElementIndex(); - } catch (IOException e) { - - } - return String.format("stateId=%s, key=%s, namespace=%s, elementIndex=%d", - id, key, namespace, elemIndex); + return ComposedKey.of(id, key, namespace, elemIndex); } @Override @@ -475,11 +464,6 @@ class JStormStateInternals<K> implements StateInternals { public Iterator<T> iterator() { return new BagStateIterator(); } - - @Override - public String toString() { - return String.format("BagStateIterable: composedKey=%s", getComposedKey()); - } } } http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java index 101921f..0991448 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -186,7 +186,7 @@ public class TranslationContext { executionGraphContext.registerStreamProducer( TaggedPValue.of(tag, value), Stream.Producer.of(name, tag.getId(), value.getName())); - //bolt.addOutputTags(tag); + bolt.addOutputTags(executor, tag); } // add the transform executor into the chain of ExecutorsBolt @@ -196,7 +196,7 @@ public class TranslationContext { if (userGraphContext.findTupleTag(value) != null) { tag = userGraphContext.findTupleTag(value); } - bolt.addExecutor(tag, executor); + bolt.addExecutor(tag, executor, userGraphContext.getStepName()); // filter all connections inside bolt //if (!bolt.getOutputTags().contains(tag)) { @@ -212,7 +212,7 @@ public class TranslationContext { for (PValue sideInput : sideInputs) { TupleTag tag = userGraphContext.findTupleTag(sideInput); - bolt.addExecutor(tag, executor); + bolt.addExecutor(tag, executor, userGraphContext.getStepName()); checkState(!bolt.getOutputTags().contains(tag)); addStormStreamDef( TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL)); @@ -304,6 +304,15 @@ public class TranslationContext { return pValueToTupleTag.get(checkNotNull(pValue, "pValue")); } + public PValue findPValue(TupleTag tupleTag) { + for (Map.Entry<PValue, TupleTag> entry : pValueToTupleTag.entrySet()) { + if (entry.getValue().equals(tupleTag)) { + return entry.getKey(); + } + } + return null; + } + public void setWindowed() { this.isWindowed = true; } @@ -361,6 +370,10 @@ public class TranslationContext { return this.spoutMap; } + public Map<String, ExecutorsBolt> getBolts() { + return this.boltMap; + } + public String registerBolt(ExecutorsBolt bolt) { checkNotNull(bolt, "bolt"); String name = "bolt" + genId(); http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java index dab9518..4ae28e6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java @@ -27,6 +27,7 @@ import com.alibaba.jstorm.utils.KryoSerializer; import java.io.IOException; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory; public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); + private final String name; private final String description; private final UnboundedSource source; private final SerializedPipelineOptions serializedOptions; @@ -62,10 +64,12 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou private long lastWaterMark = 0L; public UnboundedSourceSpout( + String name, String description, UnboundedSource source, JStormPipelineOptions options, TupleTag<?> outputTag) { + this.name = name; this.description = checkNotNull(description, "description"); this.source = checkNotNull(source, "source"); this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); @@ -174,6 +178,14 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou } } + public String getName() { + return name; + } + + public TupleTag getOutputTag() { + return outputTag; + } + public UnboundedSource getUnboundedSource() { return source; } http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java index 54c9b94..7cf2469 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java @@ -37,6 +37,7 @@ class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbo PValue output = userGraphContext.getOutput(); UnboundedSourceSpout spout = new UnboundedSourceSpout( + userGraphContext.getStepName(), description, transform.getSource(), userGraphContext.getOptions(), tag); context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
