[ 
https://issues.apache.org/jira/browse/FLINK-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532684#comment-14532684
 ] 

ASF GitHub Bot commented on FLINK-1977:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/659#discussion_r29854565
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
 ---
    @@ -1,149 +1,149 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.flink.streaming.connectors.flume;
    -
    -import java.util.List;
    -
    -import org.apache.flink.streaming.api.datastream.DataStream;
    -import org.apache.flink.streaming.connectors.ConnectorSource;
    -import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    -import org.apache.flink.util.Collector;
    -import org.apache.flume.Context;
    -import org.apache.flume.channel.ChannelProcessor;
    -import org.apache.flume.source.AvroSource;
    -import org.apache.flume.source.avro.AvroFlumeEvent;
    -import org.apache.flume.source.avro.Status;
    -
    -public class FlumeSource<OUT> extends ConnectorSource<OUT> {
    -   private static final long serialVersionUID = 1L;
    -
    -   String host;
    -   String port;
    -   volatile boolean finished = false;
    -
    -   private volatile boolean isRunning = false;
    -
    -   FlumeSource(String host, int port, DeserializationSchema<OUT> 
deserializationSchema) {
    -           super(deserializationSchema);
    -           this.host = host;
    -           this.port = Integer.toString(port);
    -   }
    -
    -   public class MyAvroSource extends AvroSource {
    -           Collector<OUT> collector;
    -
    -           /**
    -            * Sends the AvroFlumeEvent from it's argument list to the 
Apache Flink
    -            * {@link DataStream}.
    -            * 
    -            * @param avroEvent
    -            *            The event that should be sent to the dataStream
    -            * @return A {@link Status}.OK message if sending the event was
    -            *         successful.
    -            */
    -           @Override
    -           public Status append(AvroFlumeEvent avroEvent) {
    -                   collect(avroEvent);
    -                   return Status.OK;
    -           }
    -
    -           /**
    -            * Sends the AvroFlumeEvents from it's argument list to the 
Apache Flink
    -            * {@link DataStream}.
    -            * 
    -            * @param events
    -            *            The events that is sent to the dataStream
    -            * @return A Status.OK message if sending the events was 
successful.
    -            */
    -           @Override
    -           public Status appendBatch(List<AvroFlumeEvent> events) {
    -                   for (AvroFlumeEvent avroEvent : events) {
    -                           collect(avroEvent);
    -                   }
    -
    -                   return Status.OK;
    -           }
    -
    -           /**
    -            * Deserializes the AvroFlumeEvent before sending it to the 
Apache Flink
    -            * {@link DataStream}.
    -            * 
    -            * @param avroEvent
    -            *            The event that is sent to the dataStream
    -            */
    -           private void collect(AvroFlumeEvent avroEvent) {
    -                   byte[] b = avroEvent.getBody().array();
    -                   OUT out = FlumeSource.this.schema.deserialize(b);
    -
    -                   if (schema.isEndOfStream(out)) {
    -                           FlumeSource.this.finished = true;
    -                           this.stop();
    -                           FlumeSource.this.notifyAll();
    -                   } else {
    -                           collector.collect(out);
    -                   }
    -
    -           }
    -
    -   }
    -
    -   MyAvroSource avroSource;
    -
    -   /**
    -    * Configures the AvroSource. Also sets the collector so the 
application can
    -    * use it from outside of the invoke function.
    -    * 
    -    * @param collector
    -    *            The collector used in the invoke function
    -    */
    -   public void configureAvroSource(Collector<OUT> collector) {
    -
    -           avroSource = new MyAvroSource();
    -           avroSource.collector = collector;
    -           Context context = new Context();
    -           context.put("port", port);
    -           context.put("bind", host);
    -           avroSource.configure(context);
    -           // An instance of a ChannelProcessor is required for 
configuring the
    -           // avroSource although it will not be used in this case.
    -           ChannelProcessor cp = new ChannelProcessor(null);
    -           avroSource.setChannelProcessor(cp);
    -   }
    -
    -   /**
    -    * Configures the AvroSource and runs until the user calls a close 
function.
    -    * 
    -    * @param collector
    -    *            The Collector for sending data to the datastream
    -    */
    -   @Override
    -   public void run(Collector<OUT> collector) throws Exception {
    -           isRunning = true;
    -           configureAvroSource(collector);
    -           avroSource.start();
    -           while (!finished && isRunning) {
    -                   this.wait();
    -           }
    -   }
    -
    -   @Override
    -   public void cancel() {
    -           isRunning = false;
    -   }
    -
    -}
    +///*
    +// * Licensed to the Apache Software Foundation (ASF) under one or more
    +// * contributor license agreements.  See the NOTICE file distributed with
    +// * this work for additional information regarding copyright ownership.
    +// * The ASF licenses this file to You under the Apache License, Version 
2.0
    +// * (the "License"); you may not use this file except in compliance with
    +// * the License.  You may obtain a copy of the License at
    +// *
    +// *    http://www.apache.org/licenses/LICENSE-2.0
    +// *
    +// * Unless required by applicable law or agreed to in writing, software
    +// * distributed under the License is distributed on an "AS IS" BASIS,
    +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    +// * See the License for the specific language governing permissions and
    +// * limitations under the License.
    +// */
    +//
    +//package org.apache.flink.streaming.connectors.flume;
    +//
    +//import java.util.List;
    +//
    +//import org.apache.flink.streaming.api.datastream.DataStream;
    +//import org.apache.flink.streaming.connectors.ConnectorSource;
    +//import 
org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +//import org.apache.flink.util.Collector;
    +//import org.apache.flume.Context;
    +//import org.apache.flume.channel.ChannelProcessor;
    +//import org.apache.flume.source.AvroSource;
    +//import org.apache.flume.source.avro.AvroFlumeEvent;
    +//import org.apache.flume.source.avro.Status;
    +//
    +//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
    +// private static final long serialVersionUID = 1L;
    +//
    +// String host;
    +// String port;
    +// volatile boolean finished = false;
    +//
    +// private volatile boolean isRunning = false;
    +//
    +// FlumeSource(String host, int port, DeserializationSchema<OUT> 
deserializationSchema) {
    +//         super(deserializationSchema);
    +//         this.host = host;
    +//         this.port = Integer.toString(port);
    +// }
    +//
    +// public class MyAvroSource extends AvroSource {
    +//         Collector<OUT> output;
    +//
    +//         /**
    +//          * Sends the AvroFlumeEvent from it's argument list to the 
Apache Flink
    +//          * {@link DataStream}.
    +//          *
    +//          * @param avroEvent
    +//          *            The event that should be sent to the dataStream
    +//          * @return A {@link Status}.OK message if sending the event was
    +//          *         successful.
    +//          */
    +//         @Override
    +//         public Status append(AvroFlumeEvent avroEvent) {
    +//                 collect(avroEvent);
    +//                 return Status.OK;
    +//         }
    +//
    +//         /**
    +//          * Sends the AvroFlumeEvents from it's argument list to the 
Apache Flink
    +//          * {@link DataStream}.
    +//          *
    +//          * @param events
    +//          *            The events that is sent to the dataStream
    +//          * @return A Status.OK message if sending the events was 
successful.
    +//          */
    +//         @Override
    +//         public Status appendBatch(List<AvroFlumeEvent> events) {
    +//                 for (AvroFlumeEvent avroEvent : events) {
    +//                         collect(avroEvent);
    +//                 }
    +//
    +//                 return Status.OK;
    +//         }
    +//
    +//         /**
    +//          * Deserializes the AvroFlumeEvent before sending it to the 
Apache Flink
    +//          * {@link DataStream}.
    +//          *
    +//          * @param avroEvent
    +//          *            The event that is sent to the dataStream
    +//          */
    +//         private void collect(AvroFlumeEvent avroEvent) {
    +//                 byte[] b = avroEvent.getBody().array();
    +//                 OUT out = FlumeSource.this.schema.deserialize(b);
    +//
    +//                 if (schema.isEndOfStream(out)) {
    +//                         FlumeSource.this.finished = true;
    +//                         this.stop();
    +//                         FlumeSource.this.notifyAll();
    +//                 } else {
    +//                         output.collect(out);
    --- End diff --
    
    No, this one is a bit harder since the Flume source has its own loop.


> Rework Stream Operators to always be push based
> -----------------------------------------------
>
>                 Key: FLINK-1977
>                 URL: https://issues.apache.org/jira/browse/FLINK-1977
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> This is a result of the discussion on the mailing list. This is an excerpt 
> from the mailing list that gives the basic idea of the change:
> I propose to change all streaming operators to be push based, with a
> slightly improved interface: In addition to collect(), which I would
> call receiveElement() I would add receivePunctuation() and
> receiveBarrier(). The first operator in the chain would also get data
> from the outside invokable that reads from the input iterator and
> calls receiveElement() for the first operator in a chain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to