[ 
https://issues.apache.org/jira/browse/FLINK-3889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286318#comment-15286318
 ] 

ASF GitHub Bot commented on FLINK-3889:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1984#discussion_r63493915
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
    @@ -0,0 +1,345 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.JobException;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
    + * to downstream tasks for further reading and processing. Which splits 
will be further processed
    + * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
    + */
    +@Internal
    +public class FileSplitMonitoringFunction<OUT>
    +   extends RichSourceFunction<FileInputSplit> implements 
Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, 
List<FileInputSplit>>, Long>> {
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
    +
    +   /**
    +    * Specifies when computation will be triggered.
    +    */
    +   public enum WatchType {
    +           PROCESS_ONCE,                           // Processes the 
current content of a file/path only ONCE, and stops monitoring.
    +           REPROCESS_WITH_APPENDED         // Reprocesses the whole file 
when new data is appended.
    +   }
    +
    +   /** The path to monitor. */
    +   private final String path;
    +
    +   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
    +   private final int readerParallelism;
    +
    +   /** The {@link FileInputFormat} to be read. */
    +   private FileInputFormat<OUT> format;
    +
    +   /** How often to monitor the state of the directory for new data. */
    +   private final long interval;
    +
    +   /** Which new data to process (see {@link WatchType}. */
    +   private final WatchType watchType;
    +
    +   private List<Tuple2<Long, List<FileInputSplit>>> 
splitsToFwdOrderedAscByModTime;
    +
    +   private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
    +
    +   private long globalModificationTime;
    +
    +   private FilePathFilter pathFilter;
    +
    +   private volatile boolean isRunning = true;
    +
    +   /**
    +    * This is the {@link Configuration} to be used to initialize the input 
format at the reader
    +    * (see {@link #open(Configuration)}). In the codebase, whenever {@link 
#open(Configuration)} is called,
    +    * it is passed a new configuration, thus ignoring potential 
user-specified parameters. Now, we pass a
    +    * configuration object at the constructor, which is shipped to the 
remote tasks.
    +    * */
    +   private Configuration configuration;
    +
    +   public FileSplitMonitoringFunction(
    +           FileInputFormat<OUT> format, String path, Configuration 
configuration,
    +           WatchType watchType, int readerParallelism, long interval) {
    +
    +           this(format, path, configuration, 
FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, 
interval);
    +   }
    +
    +   public FileSplitMonitoringFunction(
    +           FileInputFormat<OUT> format, String path, Configuration 
configuration,
    +           FilePathFilter filter, WatchType watchType, int 
readerParallelism, long interval) {
    +
    +           this.format = Preconditions.checkNotNull(format);
    +           this.path = Preconditions.checkNotNull(path);
    +           this.configuration = Preconditions.checkNotNull(configuration);
    +
    +           Preconditions.checkArgument(interval >= 100,
    +                   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
    +           this.interval = interval;
    +
    +           this.watchType = watchType;
    +
    +           this.pathFilter = Preconditions.checkNotNull(filter);
    +
    +           this.readerParallelism = Math.max(readerParallelism, 1);
    +           this.globalModificationTime = Long.MIN_VALUE;
    +   }
    +
    +   @Override
    +   @SuppressWarnings("unchecked")
    +   public void open(Configuration parameters) throws Exception {
    +           LOG.info("Opening File Monitoring Source.");
    +           
    +           super.open(parameters);
    +           format.configure(this.configuration);
    +   }
    +
    +   @Override
    +   public void run(SourceFunction.SourceContext<FileInputSplit> context) 
throws Exception {
    +           FileSystem fileSystem = FileSystem.get(new URI(path));
    +
    +           switch (watchType) {
    +                   case REPROCESS_WITH_APPENDED:
    +                           while (isRunning) {
    +                                   monitorDirAndForwardSplits(fileSystem, 
context);
    +                                   Thread.sleep(interval);
    +                           }
    +                           isRunning = false;
    +                           break;
    +                   case PROCESS_ONCE:
    +                           monitorDirAndForwardSplits(fileSystem, context);
    +                           isRunning = false;
    +                           break;
    +                   default:
    +                           isRunning = false;
    +                           throw new RuntimeException("Unknown WatchType" 
+ watchType);
    +           }
    +   }
    +
    +   public boolean isRunning() {
    +           return this.isRunning;
    +   }
    +
    +   private void monitorDirAndForwardSplits(FileSystem fs, 
SourceContext<FileInputSplit> context) throws IOException, JobException {
    +           final Object lock = context.getCheckpointLock();
    +
    +           // it may be non-null in the case of a recovery after a failure.
    +           if (currentSplitsToFwd != null) {
    +                   synchronized (lock) {
    +                           forwardSplits(currentSplitsToFwd, context);
    +                   }
    +           }
    +           currentSplitsToFwd = null;
    +
    +           // it may be non-null in the case of a recovery after a failure.
    +           if (splitsToFwdOrderedAscByModTime == null) {
    +                   splitsToFwdOrderedAscByModTime = 
getInputSplitSortedOnModTime(fs);
    +           }
    +
    +           Iterator<Tuple2<Long, List<FileInputSplit>>> it =
    +                   splitsToFwdOrderedAscByModTime.iterator();
    +
    +           while (it.hasNext()) {
    +                   synchronized (lock) {
    +                           currentSplitsToFwd = it.next();
    +                           it.remove();
    +                           forwardSplits(currentSplitsToFwd, context);
    +                   }
    +           }
    +
    +           // set them to null to distinguish from a restore.
    +           splitsToFwdOrderedAscByModTime = null;
    +           currentSplitsToFwd = null;
    +   }
    +
    +   private void forwardSplits(Tuple2<Long, List<FileInputSplit>> 
splitsToFwd, SourceContext<FileInputSplit> context) {
    +           currentSplitsToFwd = splitsToFwd;
    +           Long modTime = currentSplitsToFwd.f0;
    +           List<FileInputSplit> splits = currentSplitsToFwd.f1;
    +
    +           Iterator<FileInputSplit> it = splits.iterator();
    +           while (it.hasNext()) {
    +                   FileInputSplit split = it.next();
    +                   processSplit(split, context);
    +                   it.remove();
    +           }
    +
    +           // update the global modification time
    +           if (modTime >= globalModificationTime) {
    +                   globalModificationTime = modTime;
    +           }
    +   }
    +
    +   private void processSplit(FileInputSplit split, 
SourceContext<FileInputSplit> context) {
    +           LOG.info("Forwarding split: " + split);
    +           context.collect(split);
    +   }
    +
    +   private List<Tuple2<Long, List<FileInputSplit>>> 
getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
    +           List<FileStatus> eligibleFiles = listEligibleFiles(fileSystem);
    +           if (eligibleFiles.isEmpty()) {
    +                   return new ArrayList<>();
    +           }
    +
    +           Map<Long, List<FileInputSplit>> splitsToForward = 
getInputSplits(eligibleFiles);
    +           List<Tuple2<Long, List<FileInputSplit>>> sortedSplitsToForward 
= new ArrayList<>();
    +
    +           for (Map.Entry<Long, List<FileInputSplit>> entry : 
splitsToForward.entrySet()) {
    +                   sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), 
entry.getValue()));
    +           }
    +
    +           Collections.sort(sortedSplitsToForward, new 
Comparator<Tuple2<Long, List<FileInputSplit>>>() {
    +                   @Override
    +                   public int compare(Tuple2<Long, List<FileInputSplit>> 
o1, Tuple2<Long, List<FileInputSplit>> o2) {
    +                           return (int) (o1.f0 - o2.f0);
    +                   }
    +           });
    +
    +           return sortedSplitsToForward;
    +   }
    +
    +   /**
    +    * Creates the input splits for the path to be assigned to the 
downstream tasks.
    --- End diff --
    
    The `files` parameter should be called `eligibleFiles`, right? Also the 
comment seems to indicate that files in the list are not used while they are 
the ones that are actually used.


> Make File Monitoring Function checkpointable.
> ---------------------------------------------
>
>                 Key: FLINK-3889
>                 URL: https://issues.apache.org/jira/browse/FLINK-3889
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>
> This is essentially the combination of FLINK-3808 and FLINK-3717.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to