szlta commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464303366



##########
File path: 
llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
##########
@@ -250,18 +255,71 @@ public void testGetOrcTailForPath() throws Exception {
     Configuration jobConf = new Configuration();
     Configuration daemonConf = new Configuration();
     CacheTag tag = CacheTag.build("test-table");
-    OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, 
tag, daemonConf, cache);
+    OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, 
tag, daemonConf, cache, null);
     jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
-    OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, 
tag, daemonConf, cache);
+    OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, 
tag, daemonConf, cache, null);
     assertEquals(uncached.getSerializedTail(), cached.getSerializedTail());
     assertEquals(uncached.getFileTail(), cached.getFileTail());
   }
 
+  @Test
+  public void testGetOrcTailForPathWithFileId() throws Exception {
+    DummyMemoryManager mm = new DummyMemoryManager();
+    DummyCachePolicy cp = new DummyCachePolicy();
+    final int MAX_ALLOC = 64;
+    LlapDaemonCacheMetrics metrics = LlapDaemonCacheMetrics.create("", "");
+    BuddyAllocator alloc = new BuddyAllocator(
+        false, false, 8, MAX_ALLOC, 1, 4096, 0, null, mm, metrics, null, true);
+    MetadataCache cache = new MetadataCache(alloc, mm, cp, true, metrics);
+
+    Path path = new Path("../data/files/alltypesorc");
+    Configuration jobConf = new Configuration();
+    Configuration daemonConf = new Configuration();
+    CacheTag tag = CacheTag.build("test-table");
+    FileSystem fs = FileSystem.get(daemonConf);
+    FileStatus fileStatus = fs.getFileStatus(path);
+    OrcTail uncached = 
OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag, 
daemonConf, cache, new SyntheticFileId(fileStatus));
+    jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
+    // this should work from the cache, by recalculating the same fileId
+    OrcTail cached = 
OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag, 
daemonConf, cache, null);
+    assertEquals(uncached.getSerializedTail(), cached.getSerializedTail());
+    assertEquals(uncached.getFileTail(), cached.getFileTail());
+  }
+
+  @Test
+  public void testGetOrcTailForPathWithFileIdChange() throws Exception {
+    DummyMemoryManager mm = new DummyMemoryManager();
+    DummyCachePolicy cp = new DummyCachePolicy();
+    final int MAX_ALLOC = 64;
+    LlapDaemonCacheMetrics metrics = LlapDaemonCacheMetrics.create("", "");
+    BuddyAllocator alloc = new BuddyAllocator(
+        false, false, 8, MAX_ALLOC, 1, 4096, 0, null, mm, metrics, null, true);
+    MetadataCache cache = new MetadataCache(alloc, mm, cp, true, metrics);
+
+    Path path = new Path("../data/files/alltypesorc");
+    Configuration jobConf = new Configuration();
+    Configuration daemonConf = new Configuration();
+    CacheTag tag = CacheTag.build("test-table");
+    OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, 
cache, new SyntheticFileId(path, 100, 100));
+    jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
+    Exception ex = null;
+    try {
+      // this should miss the cache, since the fileKey changed
+      OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, 
cache, new SyntheticFileId(path, 100, 101));

Review comment:
       you can add a _fail_ call here, as it should always jump from line 308 
to catch clause.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -680,14 +681,15 @@ public void setBaseAndInnerReader(
    * @param path The Orc file path we want to get the OrcTail for
    * @param conf The Configuration to access LLAP
    * @param cacheTag The cacheTag needed to get OrcTail from LLAP IO cache
+   * @param fileKey fileId of the Orc file (either the Long fileId of HDFS or 
the SyntheticFileId)
    * @return ReaderData object where the orcTail is not null. Reader can be 
null, but if we had to create
    * one we return that as well for further reuse.
    */
-  private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag 
cacheTag) throws IOException {
+  private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag 
cacheTag, Object fileKey) throws IOException {

Review comment:
       I'm fine with leaving just this one endpoint, and fileKey being 
optionally null.
   Please do emphasize the optionality of fileKey arg in the javadoc part: aka 
it will be generated if not given, etc..

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +126,217 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
+
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in 
the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  
txnid:0 is always visible
+     * @param deltaFiles bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, 
long visibilityTxnId) {
+    public DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> 
stmtIds, long visibilityTxnId,
+        List<DeltaFileMetaData> deltaFiles) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = ObjectUtils.defaultIfNull(deltaFiles, new 
ArrayList<>());
     }
-    long getMinWriteId() {
+
+    public long getMinWriteId() {
       return minWriteId;
     }
-    long getMaxWriteId() {
+
+    public long getMaxWriteId() {
       return maxWriteId;
     }
-    List<Integer> getStmtIds() {
+
+    public List<Integer> getStmtIds() {
       return stmtIds;
     }
-    long getVisibilityTxnId() {
+
+    public long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
+    public List<DeltaFileMetaData> getDeltaFilesForStmtId(final Integer 
stmtId) {
+      if (stmtIds.size() <= 1 || stmtId == null) {
+        // If it is not a multistatement delta, we do not store the stmtId in 
the file list
+        return deltaFiles;
+      } else {
+        return deltaFiles.stream().filter(df -> 
stmtId.equals(df.getStmtId())).collect(Collectors.toList());
+      }
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeLong(minWriteId);
       out.writeLong(maxWriteId);
       out.writeInt(stmtIds.size());
-      for(Integer id : stmtIds) {
+      for (Integer id : stmtIds) {
         out.writeInt(id);
       }
       out.writeLong(visibilityTxnId);
+      out.writeInt(deltaFiles.size());
+      for (DeltaFileMetaData fileMeta : deltaFiles) {
+        fileMeta.write(out);
+      }
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
       minWriteId = in.readLong();
       maxWriteId = in.readLong();
       stmtIds.clear();
       int numStatements = in.readInt();
-      for(int i = 0; i < numStatements; i++) {
+      for (int i = 0; i < numStatements; i++) {
         stmtIds.add(in.readInt());
       }
       visibilityTxnId = in.readLong();
+
+      deltaFiles.clear();
+      int numFiles = in.readInt();
+      for (int i = 0; i < numFiles; i++) {
+        DeltaFileMetaData file = new DeltaFileMetaData();
+        file.readFields(in);
+        deltaFiles.add(file);
+      }
     }
-    String getName() {
+
+    private String getName() {
       assert stmtIds.isEmpty() : "use getName(int)";
-      return AcidUtils.addVisibilitySuffix(AcidUtils
-          .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
+      return 
AcidUtils.addVisibilitySuffix(AcidUtils.deleteDeltaSubdir(minWriteId, 
maxWriteId), visibilityTxnId);
     }
-    String getName(int stmtId) {
+
+    private String getName(int stmtId) {
       assert !stmtIds.isEmpty() : "use getName()";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
     }
+
+    public List<Pair<Path, Integer>> getPaths(Path root) {
+      if (stmtIds.isEmpty()) {
+        return Collections.singletonList(new ImmutablePair<>(new Path(root, 
getName()), null));
+      } else {
+        // To support multistatement transactions we may have multiple 
directories corresponding to one DeltaMetaData
+        return getStmtIds().stream()
+            .map(stmtId -> new ImmutablePair<>(new Path(root, 
getName(stmtId)), stmtId)).collect(Collectors.toList());
+      }
+    }
+
     @Override
     public String toString() {
       return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + "," 
+ visibilityTxnId + ")";
     }
   }
+
+  final class DeltaFileMetaData implements Writable {
+    private static final int HAS_LONG_FILEID_FLAG = 1;
+    private static final int HAS_ATTEMPTID_FLAG = 2;
+    private static final int HAS_STMTID_FLAG = 4;
+
+    private long modTime;
+    private long length;
+    // Optional
+    private Integer attemptId;
+    // Optional
+    private Long fileId;
+    // Optional, if the deltaMeta contains multiple stmtIds, it will contain 
this files parent's stmtId
+    private Integer stmtId;
+
+    public DeltaFileMetaData() {
+    }
+
+    public DeltaFileMetaData(HadoopShims.HdfsFileStatusWithId fileStatus, 
Integer stmtId) {
+      modTime = fileStatus.getFileStatus().getModificationTime();
+      length = fileStatus.getFileStatus().getLen();
+      String attempt = 
AcidUtils.parseAttemptId(fileStatus.getFileStatus().getPath());
+      attemptId = StringUtils.isEmpty(attempt) ? null : 
Integer.parseInt(attempt);
+      fileId = fileStatus.getFileId();
+      this.stmtId = stmtId;
+    }
+
+    public DeltaFileMetaData(long modTime, long length, @Nullable Integer 
attemptId, @Nullable Long fileId,
+        @Nullable Integer stmtId) {
+      this.modTime = modTime;
+      this.length = length;
+      this.attemptId = attemptId;
+      this.fileId = fileId;
+      this.stmtId = stmtId;
+    }
+
+    public void clearStmtId() {
+      stmtId = null;
+    }
+
+    public Integer getStmtId() {
+      return stmtId;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      int flags = (fileId != null ? HAS_LONG_FILEID_FLAG : 0) |
+          (attemptId != null ? HAS_ATTEMPTID_FLAG : 0) |
+          (stmtId != null ? HAS_STMTID_FLAG : 0);
+      out.writeByte(flags);
+      out.writeLong(modTime);
+      out.writeLong(length);
+      if (attemptId != null) {
+        out.writeInt(attemptId);
+      }
+      if (fileId != null) {
+        out.writeLong(fileId);
+      }
+      if (stmtId != null) {
+        out.writeInt(stmtId);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      byte flags = in.readByte();
+      boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0,
+          hasAttemptId = (HAS_ATTEMPTID_FLAG & flags) != 0,
+          hasStmtId = (HAS_STMTID_FLAG & flags) != 0;
+      modTime = in.readLong();
+      length = in.readLong();
+      if (hasAttemptId) {
+        attemptId = in.readInt();
+      }
+      if (hasLongFileId) {
+        fileId = in.readLong();
+      }
+      if (hasStmtId) {
+        stmtId = in.readInt();
+      }
+    }
+
+    public Object getFileId(Path deltaDirectory, int bucketId) {
+      if (fileId != null) {
+        return fileId;
+      }
+      // Calculate the synthetic fileid
+      Path realPath = getPath(deltaDirectory, bucketId);
+      return new SyntheticFileId(realPath, length, modTime);
+    }

Review comment:
       The generation of fileId may be more complicated based on what 
configuration is given. Please refer to 
https://github.com/apache/hive/blob/master/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java#L496-L517




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to