[
https://issues.apache.org/jira/browse/FLINK-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532684#comment-14532684
]
ASF GitHub Bot commented on FLINK-1977:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/659#discussion_r29854565
--- Diff:
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
---
@@ -1,149 +1,149 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.flume;
-
-import java.util.List;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Collector;
-import org.apache.flume.Context;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.source.AvroSource;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.Status;
-
-public class FlumeSource<OUT> extends ConnectorSource<OUT> {
- private static final long serialVersionUID = 1L;
-
- String host;
- String port;
- volatile boolean finished = false;
-
- private volatile boolean isRunning = false;
-
- FlumeSource(String host, int port, DeserializationSchema<OUT>
deserializationSchema) {
- super(deserializationSchema);
- this.host = host;
- this.port = Integer.toString(port);
- }
-
- public class MyAvroSource extends AvroSource {
- Collector<OUT> collector;
-
- /**
- * Sends the AvroFlumeEvent from it's argument list to the
Apache Flink
- * {@link DataStream}.
- *
- * @param avroEvent
- * The event that should be sent to the dataStream
- * @return A {@link Status}.OK message if sending the event was
- * successful.
- */
- @Override
- public Status append(AvroFlumeEvent avroEvent) {
- collect(avroEvent);
- return Status.OK;
- }
-
- /**
- * Sends the AvroFlumeEvents from it's argument list to the
Apache Flink
- * {@link DataStream}.
- *
- * @param events
- * The events that is sent to the dataStream
- * @return A Status.OK message if sending the events was
successful.
- */
- @Override
- public Status appendBatch(List<AvroFlumeEvent> events) {
- for (AvroFlumeEvent avroEvent : events) {
- collect(avroEvent);
- }
-
- return Status.OK;
- }
-
- /**
- * Deserializes the AvroFlumeEvent before sending it to the
Apache Flink
- * {@link DataStream}.
- *
- * @param avroEvent
- * The event that is sent to the dataStream
- */
- private void collect(AvroFlumeEvent avroEvent) {
- byte[] b = avroEvent.getBody().array();
- OUT out = FlumeSource.this.schema.deserialize(b);
-
- if (schema.isEndOfStream(out)) {
- FlumeSource.this.finished = true;
- this.stop();
- FlumeSource.this.notifyAll();
- } else {
- collector.collect(out);
- }
-
- }
-
- }
-
- MyAvroSource avroSource;
-
- /**
- * Configures the AvroSource. Also sets the collector so the
application can
- * use it from outside of the invoke function.
- *
- * @param collector
- * The collector used in the invoke function
- */
- public void configureAvroSource(Collector<OUT> collector) {
-
- avroSource = new MyAvroSource();
- avroSource.collector = collector;
- Context context = new Context();
- context.put("port", port);
- context.put("bind", host);
- avroSource.configure(context);
- // An instance of a ChannelProcessor is required for
configuring the
- // avroSource although it will not be used in this case.
- ChannelProcessor cp = new ChannelProcessor(null);
- avroSource.setChannelProcessor(cp);
- }
-
- /**
- * Configures the AvroSource and runs until the user calls a close
function.
- *
- * @param collector
- * The Collector for sending data to the datastream
- */
- @Override
- public void run(Collector<OUT> collector) throws Exception {
- isRunning = true;
- configureAvroSource(collector);
- avroSource.start();
- while (!finished && isRunning) {
- this.wait();
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version
2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.connectors.flume;
+//
+//import java.util.List;
+//
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.connectors.ConnectorSource;
+//import
org.apache.flink.streaming.util.serialization.DeserializationSchema;
+//import org.apache.flink.util.Collector;
+//import org.apache.flume.Context;
+//import org.apache.flume.channel.ChannelProcessor;
+//import org.apache.flume.source.AvroSource;
+//import org.apache.flume.source.avro.AvroFlumeEvent;
+//import org.apache.flume.source.avro.Status;
+//
+//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
+// private static final long serialVersionUID = 1L;
+//
+// String host;
+// String port;
+// volatile boolean finished = false;
+//
+// private volatile boolean isRunning = false;
+//
+// FlumeSource(String host, int port, DeserializationSchema<OUT>
deserializationSchema) {
+// super(deserializationSchema);
+// this.host = host;
+// this.port = Integer.toString(port);
+// }
+//
+// public class MyAvroSource extends AvroSource {
+// Collector<OUT> output;
+//
+// /**
+// * Sends the AvroFlumeEvent from it's argument list to the
Apache Flink
+// * {@link DataStream}.
+// *
+// * @param avroEvent
+// * The event that should be sent to the dataStream
+// * @return A {@link Status}.OK message if sending the event was
+// * successful.
+// */
+// @Override
+// public Status append(AvroFlumeEvent avroEvent) {
+// collect(avroEvent);
+// return Status.OK;
+// }
+//
+// /**
+// * Sends the AvroFlumeEvents from it's argument list to the
Apache Flink
+// * {@link DataStream}.
+// *
+// * @param events
+// * The events that is sent to the dataStream
+// * @return A Status.OK message if sending the events was
successful.
+// */
+// @Override
+// public Status appendBatch(List<AvroFlumeEvent> events) {
+// for (AvroFlumeEvent avroEvent : events) {
+// collect(avroEvent);
+// }
+//
+// return Status.OK;
+// }
+//
+// /**
+// * Deserializes the AvroFlumeEvent before sending it to the
Apache Flink
+// * {@link DataStream}.
+// *
+// * @param avroEvent
+// * The event that is sent to the dataStream
+// */
+// private void collect(AvroFlumeEvent avroEvent) {
+// byte[] b = avroEvent.getBody().array();
+// OUT out = FlumeSource.this.schema.deserialize(b);
+//
+// if (schema.isEndOfStream(out)) {
+// FlumeSource.this.finished = true;
+// this.stop();
+// FlumeSource.this.notifyAll();
+// } else {
+// output.collect(out);
--- End diff --
No, this one is a bit harder since the Flume source has its own loop.
> Rework Stream Operators to always be push based
> -----------------------------------------------
>
> Key: FLINK-1977
> URL: https://issues.apache.org/jira/browse/FLINK-1977
> Project: Flink
> Issue Type: Improvement
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> This is a result of the discussion on the mailing list. This is an excerpt
> from the mailing list that gives the basic idea of the change:
> I propose to change all streaming operators to be push based, with a
> slightly improved interface: In addition to collect(), which I would
> call receiveElement() I would add receivePunctuation() and
> receiveBarrier(). The first operator in the chain would also get data
> from the outside invokable that reads from the input iterator and
> calls receiveElement() for the first operator in a chain.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)