nsivabalan commented on code in PR #6815:
URL: https://github.com/apache/hudi/pull/6815#discussion_r1083143655


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -488,25 +489,24 @@ public void remove() {
   private static FSDataInputStream getFSDataInputStream(FileSystem fs,
                                                         HoodieLogFile logFile,
                                                         int bufferSize) throws 
IOException {
-    FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
+    FSDataInputStream inputStream = fs.open(logFile.getPath(), bufferSize);
+    FSDataInputStream targetInputStream;
 
     if (FSUtils.isGCSFileSystem(fs)) {
       // in GCS FS, we might need to interceptor seek offsets as we might get 
EOF exception
-      return new 
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, 
logFile, bufferSize), true);
-    }
-
-    if (FSUtils.isCHDFileSystem(fs)) {
-      return new BoundedFsDataInputStream(fs, logFile.getPath(), 
fsDataInputStream);
-    }
-
-    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      return new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
+      targetInputStream = new 
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(inputStream, logFile, 
bufferSize), true);
+    } else if (FSUtils.isCHDFileSystem(fs)) {
+      targetInputStream = new BoundedFsDataInputStream(fs, logFile.getPath(), 
inputStream);
+    } else if (inputStream.getWrappedStream() instanceof FSInputStream) {
+      targetInputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
+          new BufferedFSInputStream((FSInputStream) 
inputStream.getWrappedStream(), bufferSize)));
+    } else {
+      // inputStream.getWrappedStream() maybe a BufferedFSInputStream
+      // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
+      targetInputStream = inputStream;
     }
 
-    // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
-    // need to wrap in another BufferedFSInputStream the make bufferSize work?
-    return fsDataInputStream;
+    return new LeakTrackingFSDataInputStream(targetInputStream);

Review Comment:
   so, are we enabling this leak tracking FS InputStream as default? I thoughts 
its only for debugging purpose and won't be enabled for end user consumption. 
   



##########
hudi-common/src/main/java/org/apache/hudi/util/Transient.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.function.SerializableSupplier;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Serializable;
+
+/**
+ * {@link Serializable} counterpart of {@link Lazy}
+ *
+ * @param <T> type of the object being held by {@link Transient}
+ */
+@ThreadSafe
+public class Transient<T> implements Serializable {

Review Comment:
   do we have tests for this class



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java:
##########
@@ -1064,4 +1057,22 @@ public void createImmutableFileInPath(Path fullPath, 
Option<byte[]> content)
   public FileSystem getFileSystem() {
     return fileSystem;
   }
+
+  private static FSDataInputStream wrapLeakTrackingStream(FSDataInputStream 
fsDataInputStream) throws IOException {
+    if (fsDataInputStream instanceof LeakTrackingFSDataInputStream) {
+      return fsDataInputStream;
+    }
+    return new LeakTrackingFSDataInputStream(fsDataInputStream);
+  }
+
+  private static FSDataInputStream wrapTimeTrackingStream(final Path path, 
FSDataInputStream fsDataInputStream) throws IOException {
+    if (fsDataInputStream instanceof TimedFSDataInputStream) {
+      return fsDataInputStream;
+    }
+    return new TimedFSDataInputStream(path, fsDataInputStream);
+  }
+
+  private static FSDataInputStream wrapStream(final Path path, 
FSDataInputStream fsDataInputStream) throws IOException {
+    return wrapLeakTrackingStream(wrapTimeTrackingStream(path, 
fsDataInputStream));

Review Comment:
   we should introduce some mode called diagnostics/safe mode and then enable 
such leak tracking when the mode is enabled. as we might know, such additional 
tracking might add some unnecessary overhead IMO. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -203,14 +197,14 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(L
 
               return mergedRecords.stream()
                 .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+                .filter(Objects::nonNull)
                 .iterator();
             } catch (IOException ioe) {
               throw new HoodieIOException("Error merging records from metadata 
table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
             } finally {
               closeReader(readers);
             }
-          })
-        .filter(Objects::nonNull);
+          });
   }
 
   @Override

Review Comment:
   I see that for getRecordsByKeyPrefixes(), we parallelize among each file 
size, where as for getRecordsByKeys, we do foreach (do not parallelize). 
wondering if we should parallelize getRecordsByKeys() calls too ? 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -73,8 +73,10 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
 
   private static final Logger LOG = 
LogManager.getLogger(BaseTableMetadata.class);
 
-  public static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
-  public static final int BUFFER_SIZE = 10 * 1024 * 1024;
+  protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+  // NOTE: Buffer-size is deliberately set pretty low, since MT internally is 
relying
+  //       on HFile (serving as persisted binary key-value mapping) to do 
caching
+  protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb

Review Comment:
   can we make this configurable. guess in hdfs 10Mb might be preferred. 



##########
hudi-common/src/main/java/org/apache/hudi/util/Transient.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.function.SerializableSupplier;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Serializable;
+
+/**
+ * {@link Serializable} counterpart of {@link Lazy}
+ *
+ * @param <T> type of the object being held by {@link Transient}
+ */
+@ThreadSafe
+public class Transient<T> implements Serializable {
+
+  private transient boolean initialized;
+
+  private final SerializableSupplier<T> initializer;
+  private transient T ref;
+
+  private Transient(SerializableSupplier<T> initializer) {
+    this.initializer = initializer;
+    this.ref = null;
+    this.initialized = false;
+  }
+
+  private Transient(T value, SerializableSupplier<T> initializer) {
+    this.initializer = initializer;
+    this.ref = value;
+    this.initialized = true;
+  }
+
+  public T get() {
+    if (!initialized) {
+      synchronized (this) {
+        if (!initialized) {
+          this.ref = initializer.get();
+          initialized = true;
+        }
+      }
+    }
+

Review Comment:
   nit. remove additional line break



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -205,6 +207,7 @@ public Map<Pair<String, String>, BloomFilter> 
getBloomFilters(final List<Pair<St
     HoodieTimer timer = HoodieTimer.start();
     Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
     Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    // TODO simplify (no sorting is required)

Review Comment:
   remove any cleanups if need be



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -203,14 +197,14 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(L
 
               return mergedRecords.stream()
                 .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+                .filter(Objects::nonNull)
                 .iterator();
             } catch (IOException ioe) {
               throw new HoodieIOException("Error merging records from metadata 
table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
             } finally {
               closeReader(readers);
             }
-          })
-        .filter(Objects::nonNull);
+          });
   }
 
   @Override

Review Comment:
   for colstats, I do see we do parallelize at higher layer, but thinking for 
other calls if we can try and leverage spark context to parallelize calls 
across file slics. for files partitions, there is only one file slice. so, 
might be a no-op. but for bloom 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to