erenavsarogullari commented on a change in pull request #3109: Add withStream() 
in Streamlet to support stream selection
URL: https://github.com/apache/incubator-heron/pull/3109#discussion_r235553237
 
 

 ##########
 File path: 
heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
 ##########
 @@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+import org.apache.heron.streamlet.impl.utils.StreamletUtils;
+
+/**
+ * StreamletShadow is a special kind of StreamletImpl object:
+ * - It is still an StreamletImpl therefore it supports all Streamlet 
functions like filter()
+ * and map(), and can be the parent object of other StreamletImpl objects. 
Therefore,
+ * from API point of view, it can be used in the same way as a normal 
StreamletImpl object.
+ * - However it is just an shadow object of a real StreamletImpl object and it 
doesn't
+ * represent a node in the topology DAG. Therefore it can not be a child of 
another StreamletImpl
+ * object. As the result, the shadow object is clonable, and it is fine to 
create multiple
+ * StreamletShadow objects pointing to the same StreamletImpl object and have 
different properties.
+ *
+ * A StreamletShadow object can be used to decorate the real Streamletimpl 
object. This is
+ * important for children StreamletImpl objects to consume output data from 
the same parent in
+ * different ways, such as selecting different stream.
+ *
+ * Usage:
+ * To create a shadow object that selecting "test" stream from an existing 
StreamletImpl
+ * object(stream):
+ *
+ * StreamletImpl shadow = new StreamletShadow(stream) {
+ *   @Override
+ *   public String getStreamId() {
+ *     return "test";
+ *   }
+ * }
+ *
+ */
+public class StreamletShadow<R> extends StreamletImpl<R> {
+  private StreamletImpl<R> real;
+  // Extra properties for a Streamlet object
+  protected String streamId;
+
+  public StreamletShadow(StreamletImpl<R> real) {
+    this.real = real;
+    this.streamId = real.getStreamId();
+  }
+
+  /**
+   * Sets the stream id of this Streamlet.
+   * @param streamId the stream id for this streamlet.
+   */
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+
+  /**
+   * Gets the stream id of this Streamlet.
+   * @return the stream id of this Streamlet
+   */
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  /*
+   * Functions accessible by child objects need to be overriden (forwarding 
the call to
+   * the real object since shadow object shouldn't have them)
+   */
+  @Override
+  public String getName() {
+    return real.getName();
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return real.getNumPartitions();
+  }
+
+  /*
+   * Functions related to topology building need to be overriden.
+   */
+  @Override
+  public <T> void addChild(StreamletImpl<T> child) {
+    real.addChild(child);
+  }
+
+  @Override
+  public void build(TopologyBuilder bldr, Set<String> stageNames) {
+    StreamletUtils.require(false, "build() in StreamletShadow should NOT be 
invoked");
 
 Review comment:
   Could `UnsupportedOperationException` be more convenient? Also, same for 
**Row**: 111

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to