[
https://issues.apache.org/jira/browse/SAMOA-16?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14513705#comment-14513705
]
ASF GitHub Bot commented on SAMOA-16:
-------------------------------------
Github user senorcarbone commented on a diff in the pull request:
https://github.com/apache/incubator-samoa/pull/11#discussion_r29128256
--- Diff:
samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
---
@@ -0,0 +1,91 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2015 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.topology.AbstractStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.io.Serializable;
+
+
+/**
+ * A stream for SAMOA based on Apache Flink's DataStream
+ */
+public class FlinkStream extends AbstractStream implements FlinkComponent,
Serializable {
+
+ private static int outputCounter = 0;
+ private FlinkComponent procItem;
+ private transient DataStream<SamoaType> dataStream;
+ private int sourcePiId;
+ private String flinkStreamId;
+
+ public FlinkStream(FlinkComponent sourcePi) {
+ this.procItem = sourcePi;
+ this.sourcePiId = sourcePi.getComponentId();
+ setStreamId("stream-" + Integer.toString(outputCounter));
+ flinkStreamId = "stream-" + Integer.toString(outputCounter);
+ outputCounter++;
+ }
+
+ @Override
+ public void initialise() {
+ if (procItem instanceof FlinkProcessingItem) {
+ dataStream =
procItem.getOutStream().filter(Utils.getFilter(getStreamId()));
+ } else
+ dataStream = procItem.getOutStream();
+ }
+
+ @Override
+ public boolean canBeInitialised() {
+ return procItem.isInitialised();
--- End diff --
Since there are no named streams in Apache Flink we tag the stream names
and filter them out for each consumer task depending on their subscription.
This yields the same behavior with Storm's or Samza's named streams and it is
still the same performance-wise since filter operators are chained in the same
thread.
> Add an adapter for Apache Flink-Streaming
> -----------------------------------------
>
> Key: SAMOA-16
> URL: https://issues.apache.org/jira/browse/SAMOA-16
> Project: SAMOA
> Issue Type: New Feature
> Reporter: Paris Carbone
> Assignee: Gianmarco De Francisci Morales
>
> Apache Flink-Streaming is a new system for distributed stream processing
> built for unique and flexible high level stream transformations. A Flink
> adapter for Samoa should be able to translate a Samoa Task topology into
> Flink streaming transformations. Some of the challenges are the compositional
> topology support, circle detection and their translation to Flink iterations.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)