[ 
https://issues.apache.org/jira/browse/FLINK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286332#comment-15286332
 ] 

ASF GitHub Bot commented on FLINK-3889:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63494863
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.api.common.io.CheckpointableInputFormat;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
    + * This operator will receive just the split descriptors and then read and 
emit records. This may lead
    + * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
    + * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
    + * current state.
    + * */
    +public class FileSplitReadOperator<OUT, S extends Serializable> extends 
AbstractStreamOperator<OUT>
    +   implements OneInputStreamOperator<FileInputSplit, OUT> {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitReadOperator.class);
    +
    +   private static final FileInputSplit EOF = new FileInputSplit(-1, null, 
-1, -1, null);
    +
    +   private transient SplitReader<S, OUT> reader;
    +   private transient TimestampedCollector<OUT> collector;
    +
    +   private Configuration configuration;
    +   private FileInputFormat<OUT> format;
    +   private TypeInformation<OUT> typeInfo;
    +
    +   private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
    +
    +   public FileSplitReadOperator(FileInputFormat<OUT> format, 
TypeInformation<OUT> typeInfo, Configuration configuration) {
    +           this.format = checkNotNull(format);
    +           this.typeInfo = checkNotNull(typeInfo);
    +           this.configuration = checkNotNull(configuration);
    +   }
    +
    +   @Override
    +   public void open() throws Exception {
    +           super.open();
    +
    +           this.format.configure(configuration);
    +           this.collector = new TimestampedCollector<>(output);
    +
    +           TypeSerializer<OUT> serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
    +           Object checkpointLock = getContainingTask().getCheckpointLock();
    +
    +           this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock);
    +           this.reader.setReaderState(this.readerState);
    +           this.reader.start();
    +           this.readerState = null;
    +   }
    +
    +   @Override
    +   public void processElement(StreamRecord<FileInputSplit> element) throws 
Exception {
    +           reader.addSplit(element.getValue());
    +   }
    +
    +   @Override
    +   public void processWatermark(Watermark mark) throws Exception {
    +           output.emitWatermark(mark);
    +   }
    +
    +   @Override
    +   public void dispose() {
    +           super.dispose();
    +
    +           // first try to cancel it properly and
    +           // give it some time until it finishes
    +           reader.cancel();
    +           try {
    +                   reader.join(200);
    +           } catch (InterruptedException e) {
    +                   // we can ignore this
    +           }
    +
    +           // if the above did not work, then interrupt the thread 
repeatedly
    +           while (reader.isAlive()) {
    +
    +                   StringBuilder bld = new StringBuilder();
    +                   StackTraceElement[] stack = reader.getStackTrace();
    +                   for (StackTraceElement e : stack) {
    +                           bld.append(e).append('\n');
    +                   }
    +                   LOG.warn("The reader is stuck in method:\n {}", 
bld.toString());
    +
    +                   reader.interrupt();
    +                   try {
    +                           reader.join(50);
    +                   } catch (InterruptedException e) {
    +                           // we can ignore this
    +                   }
    +           }
    +           reader = null;
    +           collector = null;
    +           configuration = null;
    +           format = null;
    +           typeInfo = null;
    +   }
    +
    +   @Override
    +   public void close() throws Exception {
    +           super.close();
    +
    +           // signal that no more splits will come, wait for the reader to 
finish
    +           // and close the collector. Further cleaning up is handled by 
the dispose().
    +
    +           if (reader != null && reader.isAlive() && reader.isRunning()) {
    +                   // add a dummy element to signal that no more splits 
will
    +                   // arrive and wait until the reader finishes
    +                   reader.addSplit(EOF);
    +                   reader.join();
    +           }
    +           collector.close();
    +   }
    +
    +   private class SplitReader<S extends Serializable, OT> extends Thread {
    +
    +           private volatile boolean isRunning;
    +
    +           private final FileInputFormat<OT> format;
    +           private final TypeSerializer<OT> serializer;
    +
    +           private final Object checkpointLock;
    +           private final TimestampedCollector<OT> collector;
    +
    +           private final Object lock = new Object();
    +
    +           private final Queue<FileInputSplit> pendingSplits;
    +
    +           SplitReader(FileInputFormat<OT> format,
    +                                   TypeSerializer<OT> serializer,
    +                                   TimestampedCollector<OT> collector,
    +                                   Object checkpointLock) {
    +
    +                   this.format = checkNotNull(format, "Unspecified 
FileInputFormat.");
    +                   this.serializer = checkNotNull(serializer, "Unspecified 
Serialized.");
    +
    +                   this.pendingSplits = new LinkedList<>();
    +                   this.collector = collector;
    +                   this.checkpointLock = checkpointLock;
    +                   this.isRunning = true;
    +           }
    +
    +           void addSplit(FileInputSplit split) {
    +                   Preconditions.checkNotNull(split);
    +                   synchronized (lock) {
    +                           this.pendingSplits.add(split);
    +                   }
    +           }
    +
    +           public boolean isRunning() {
    +                   return this.isRunning;
    +           }
    +
    +           @Override
    +           public void run() {
    +                   FileInputSplit split = null;
    +                   try {
    +                           while (this.isRunning) {
    +
    +                                   // get the next split to read.
    +                                   // locking is needed because 
checkpointing will
    +                                   // ask for a consistent snapshot of the 
list.
    +                                   synchronized (lock) {
    --- End diff --
    
    The lock seems only to be required later, here we are not changing the 
queue.


> Make File Monitoring Function checkpointable.
> ---------------------------------------------
>
>                 Key: FLINK-3889
>                 URL: https://issues.apache.org/jira/browse/FLINK-3889
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>
> This is essentially the combination of FLINK-3808 and FLINK-3717.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to