[
https://issues.apache.org/jira/browse/SAMOA-16?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14513681#comment-14513681
]
ASF GitHub Bot commented on SAMOA-16:
-------------------------------------
Github user gdfm commented on a diff in the pull request:
https://github.com/apache/incubator-samoa/pull/11#discussion_r29126767
--- Diff: samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
---
@@ -0,0 +1,129 @@
+package com.yahoo.labs.flink;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2015 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.ClassOption;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.flink.topology.impl.FlinkComponentFactory;
+import com.yahoo.labs.flink.topology.impl.FlinkProcessingItem;
+import com.yahoo.labs.flink.topology.impl.FlinkStream;
+import com.yahoo.labs.flink.topology.impl.FlinkTopology;
+import com.yahoo.labs.samoa.tasks.Task;
+import org.apache.flink.api.java.tuple.Tuple3;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+/**
+ * Main class to run a SAMOA on Apache Flink
+ */
+public class FlinkDoTask {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FlinkDoTask.class);
+ public static List<List<FlinkProcessingItem>> circles ;
+ public static List<Integer> circleTails = new ArrayList<Integer>();
+
+
+ public static void main(String[] args) throws Exception {
+ List<String> tmpArgs = new
ArrayList<String>(Arrays.asList(args));
+
+ args = tmpArgs.toArray(new String[0]);
+
+ // Init Task
+ StringBuilder cliString = new StringBuilder();
+ for (int i = 0; i < args.length; i++) {
+ cliString.append(" ").append(args[i]);
+ }
+ logger.debug("Command line string = {}", cliString.toString());
+ System.out.println("Command line string = " +
cliString.toString());
+
+ Task task;
+ try {
+ task =
ClassOption.cliStringToObject(cliString.toString(), Task.class, null);
+ logger.debug("Successfully instantiating {}",
task.getClass().getCanonicalName());
+ } catch (Exception e) {
+ logger.error("Failed to initialize the task: ", e);
+ System.out.println("Failed to initialize the task: " +
e);
+ return;
+ }
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ logger.debug("Creating the factory\n");
+ task.setFactory(new FlinkComponentFactory(env));
+
+ logger.debug("Going to initialize the task\n");
+ task.init();
+
+ circles = extractTopologyGraph((FlinkTopology)
task.getTopology());
+
+ logger.debug("Going to build the topology\n");
+ ((FlinkTopology) task.getTopology()).build();
+
+ logger.debug("Execute environment\n");
+ env.execute();
+
+ }
+
+ private static List<List<FlinkProcessingItem>>
extractTopologyGraph(FlinkTopology topology){
+ List<FlinkProcessingItem> pis =
Lists.newArrayList(Iterables.filter(topology.getProcessingItems(),
FlinkProcessingItem.class));
+ List<Integer>[] graph = new List[pis.size()];
+ FlinkProcessingItem[] processingItems = new
FlinkProcessingItem[pis.size()];
+ List<List<FlinkProcessingItem>> piCircles = new ArrayList<>();
+
+
+ for (int i=0;i<pis.size();i++) {
+ graph[i] = new ArrayList<Integer>();
+ }
+ //construct the graph of the topology for the Processing Items
(No entrance pi is included)
+ for (FlinkProcessingItem pi: pis) {
+ processingItems[pi.getComponentId()] = pi;
+ for (Tuple3<FlinkStream, Utils.Partitioning, Integer>
is : pi.getInputStreams()) {
+ if (is.f2 != -1)
graph[is.f2].add(pi.getComponentId());
+ }
+ }
+ for (int g=0;g<graph.length;g++)
+ logger.debug(graph[g].toString());
+
+ CircleDetection detCircles = new CircleDetection();
+ List<List<Integer>> circles = detCircles.getCircles(graph);
+
+ //update PIs, regarding being part fo a circle.
--- End diff --
Typo 'of'
> Add an adapter for Apache Flink-Streaming
> -----------------------------------------
>
> Key: SAMOA-16
> URL: https://issues.apache.org/jira/browse/SAMOA-16
> Project: SAMOA
> Issue Type: New Feature
> Reporter: Paris Carbone
> Assignee: Gianmarco De Francisci Morales
>
> Apache Flink-Streaming is a new system for distributed stream processing
> built for unique and flexible high level stream transformations. A Flink
> adapter for Samoa should be able to translate a Samoa Task topology into
> Flink streaming transformations. Some of the challenges are the compositional
> topology support, circle detection and their translation to Flink iterations.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)