sankarh commented on a change in pull request #791: HIVE-21924
URL: https://github.com/apache/hive/pull/791#discussion_r328981037
 
 

 ##########
 File path: 
ql/src/java/org/apache/hadoop/hive/ql/io/SkippingTextInputFormat.java
 ##########
 @@ -0,0 +1,136 @@
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * SkippingInputFormat is a header/footer aware input format. It truncates
+ * splits identified by TextInputFormat. Header and footers are removed
+ * from the splits.
+ */
+public class SkippingTextInputFormat extends TextInputFormat {
+
+  private final Map<Path, Long> startIndexMap = new ConcurrentHashMap<Path, 
Long>();
+  private final Map<Path, Long> endIndexMap = new ConcurrentHashMap<Path, 
Long>();
+  private JobConf conf;
+  private int headerCount;
+  private int footerCount;
+
+  @Override
+  public void configure(JobConf conf) {
+    this.conf = conf;
+    super.configure(conf);
+  }
+
+  public void configure(JobConf conf, int headerCount, int footerCount) {
+    configure(conf);
+    this.headerCount = headerCount;
+    this.footerCount = footerCount;
+  }
+
+  @Override
+  protected FileSplit makeSplit(Path file, long start, long length, String[] 
hosts) {
+    long cachedStart = getCachedStartIndex(file);
+    long cachedEnd = getCachedEndIndex(file);
+    if (cachedStart > start + length) {
+      return new NullRowsInputFormat.DummyInputSplit(file);
+    } else if (cachedStart > start) {
+      length = length - (cachedStart - start);
+      start = cachedStart;
+    }
+    if (cachedEnd > -1 && cachedEnd < start) {
+      return new NullRowsInputFormat.DummyInputSplit(file);
+    } else if (cachedEnd < start + length) {
+      length = cachedEnd - start;
+    }
+    return super.makeSplit(file, start, length, hosts);
+  }
+
+  @Override
+  protected FileSplit makeSplit(Path file, long start, long length, String[] 
hosts, String[] inMemoryHosts) {
+    long cachedStart = getCachedStartIndex(file);
+    long cachedEnd = getCachedEndIndex(file);
+    if (cachedStart > start + length) {
+      return new NullRowsInputFormat.DummyInputSplit(file);
+    } else if (cachedStart > start) {
+      length = length - (cachedStart - start);
+      start = cachedStart;
+    }
+    if (cachedEnd > - 1 && cachedEnd < start) {
+      return new NullRowsInputFormat.DummyInputSplit(file);
+    } else if (cachedEnd < start + length) {
+      length = cachedEnd - start;
+    }
+    return super.makeSplit(file, start, length, hosts, inMemoryHosts);
+  }
+
+  private long getCachedStartIndex(Path path) {
+    Long startIndexForFile = startIndexMap.get(path);
+    if (startIndexForFile == null) {
+      try {
+        FSDataInputStream fis = path.getFileSystem(conf).open(path);
+        for (int j = 0; j < headerCount; j++) {
+          fis.readLine();
+          // back 1 byte because readers skip the entire first row if split 
start is not 0
+          startIndexForFile = fis.getPos() - 1;
+        }
+      } catch (IOException e) {
+        startIndexForFile = 0L;
+      }
+      startIndexMap.put(path, startIndexForFile);
+    }
+    return startIndexForFile;
+  }
+
+  private long getCachedEndIndex(Path path) {
+    Long endIndexForFile = endIndexMap.get(path);
+    if (endIndexForFile == null) {
+      try {
+        final long bufferSectionSize = 1024;
+        long bufferSectionEnd = 
path.getFileSystem(conf).getFileStatus(path).getLen();
+        long bufferSectionStart = Math.max(0, bufferSectionEnd - 
bufferSectionSize);
+        Queue<Long> lineEndBuffer = new ArrayDeque<Long>(headerCount + 1);
+        FSDataInputStream fis = path.getFileSystem(conf).open(path);
+        fis.seek(bufferSectionStart);
+        while (bufferSectionEnd > bufferSectionStart) {
+          fis.seek(bufferSectionStart);
+          long pos = fis.getPos();
+          while (pos < bufferSectionEnd) {
+            fis.readLine();
+            pos = fis.getPos();
+            if (pos <= bufferSectionEnd) {
+              if (lineEndBuffer.size() > footerCount) {
+                lineEndBuffer.poll();
+              }
+              lineEndBuffer.add(pos);
+            }
+          }
+          if (lineEndBuffer.size() > footerCount) {
+            break;
+          } else {
+            bufferSectionEnd = bufferSectionStart;
+            bufferSectionStart = Math.max(0, bufferSectionEnd - 
bufferSectionSize);
+          }
+        }
+        if (lineEndBuffer.size() > footerCount) {
+          endIndexForFile = lineEndBuffer.poll();
+        } else {
+          endIndexForFile = -1L;
+        }
+      } catch (IOException e) {
+        endIndexForFile = -1L;
 
 Review comment:
   Caller doesn't handle -1L well. Better to throw exception as we don't expect 
this to fail.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to