nsivabalan commented on code in PR #6815:
URL: https://github.com/apache/hudi/pull/6815#discussion_r1083143655
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -488,25 +489,24 @@ public void remove() {
private static FSDataInputStream getFSDataInputStream(FileSystem fs,
HoodieLogFile logFile,
int bufferSize) throws
IOException {
- FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(),
bufferSize);
+ FSDataInputStream inputStream = fs.open(logFile.getPath(), bufferSize);
+ FSDataInputStream targetInputStream;
if (FSUtils.isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get
EOF exception
- return new
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream,
logFile, bufferSize), true);
- }
-
- if (FSUtils.isCHDFileSystem(fs)) {
- return new BoundedFsDataInputStream(fs, logFile.getPath(),
fsDataInputStream);
- }
-
- if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
- return new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
- new BufferedFSInputStream((FSInputStream)
fsDataInputStream.getWrappedStream(), bufferSize)));
+ targetInputStream = new
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(inputStream, logFile,
bufferSize), true);
+ } else if (FSUtils.isCHDFileSystem(fs)) {
+ targetInputStream = new BoundedFsDataInputStream(fs, logFile.getPath(),
inputStream);
+ } else if (inputStream.getWrappedStream() instanceof FSInputStream) {
+ targetInputStream = new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
+ new BufferedFSInputStream((FSInputStream)
inputStream.getWrappedStream(), bufferSize)));
+ } else {
+ // inputStream.getWrappedStream() maybe a BufferedFSInputStream
+ // need to wrap in another BufferedFSInputStream the make bufferSize
work?
+ targetInputStream = inputStream;
}
- // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
- // need to wrap in another BufferedFSInputStream the make bufferSize work?
- return fsDataInputStream;
+ return new LeakTrackingFSDataInputStream(targetInputStream);
Review Comment:
so, are we enabling this leak tracking FS InputStream as default? I thoughts
its only for debugging purpose and won't be enabled for end user consumption.
##########
hudi-common/src/main/java/org/apache/hudi/util/Transient.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.function.SerializableSupplier;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Serializable;
+
+/**
+ * {@link Serializable} counterpart of {@link Lazy}
+ *
+ * @param <T> type of the object being held by {@link Transient}
+ */
+@ThreadSafe
+public class Transient<T> implements Serializable {
Review Comment:
do we have tests for this class
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java:
##########
@@ -1064,4 +1057,22 @@ public void createImmutableFileInPath(Path fullPath,
Option<byte[]> content)
public FileSystem getFileSystem() {
return fileSystem;
}
+
+ private static FSDataInputStream wrapLeakTrackingStream(FSDataInputStream
fsDataInputStream) throws IOException {
+ if (fsDataInputStream instanceof LeakTrackingFSDataInputStream) {
+ return fsDataInputStream;
+ }
+ return new LeakTrackingFSDataInputStream(fsDataInputStream);
+ }
+
+ private static FSDataInputStream wrapTimeTrackingStream(final Path path,
FSDataInputStream fsDataInputStream) throws IOException {
+ if (fsDataInputStream instanceof TimedFSDataInputStream) {
+ return fsDataInputStream;
+ }
+ return new TimedFSDataInputStream(path, fsDataInputStream);
+ }
+
+ private static FSDataInputStream wrapStream(final Path path,
FSDataInputStream fsDataInputStream) throws IOException {
+ return wrapLeakTrackingStream(wrapTimeTrackingStream(path,
fsDataInputStream));
Review Comment:
we should introduce some mode called diagnostics/safe mode and then enable
such leak tracking when the mode is enabled. as we might know, such additional
tracking might add some unnecessary overhead IMO.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -203,14 +197,14 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(L
return mergedRecords.stream()
.map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+ .filter(Objects::nonNull)
.iterator();
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata
table for " + sortedKeyPrefixes.size() + " key : ", ioe);
} finally {
closeReader(readers);
}
- })
- .filter(Objects::nonNull);
+ });
}
@Override
Review Comment:
I see that for getRecordsByKeyPrefixes(), we parallelize among each file
size, where as for getRecordsByKeys, we do foreach (do not parallelize).
wondering if we should parallelize getRecordsByKeys() calls too ?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -73,8 +73,10 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
private static final Logger LOG =
LogManager.getLogger(BaseTableMetadata.class);
- public static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
- public static final int BUFFER_SIZE = 10 * 1024 * 1024;
+ protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+ // NOTE: Buffer-size is deliberately set pretty low, since MT internally is
relying
+ // on HFile (serving as persisted binary key-value mapping) to do
caching
+ protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb
Review Comment:
can we make this configurable. guess in hdfs 10Mb might be preferred.
##########
hudi-common/src/main/java/org/apache/hudi/util/Transient.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.function.SerializableSupplier;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Serializable;
+
+/**
+ * {@link Serializable} counterpart of {@link Lazy}
+ *
+ * @param <T> type of the object being held by {@link Transient}
+ */
+@ThreadSafe
+public class Transient<T> implements Serializable {
+
+ private transient boolean initialized;
+
+ private final SerializableSupplier<T> initializer;
+ private transient T ref;
+
+ private Transient(SerializableSupplier<T> initializer) {
+ this.initializer = initializer;
+ this.ref = null;
+ this.initialized = false;
+ }
+
+ private Transient(T value, SerializableSupplier<T> initializer) {
+ this.initializer = initializer;
+ this.ref = value;
+ this.initialized = true;
+ }
+
+ public T get() {
+ if (!initialized) {
+ synchronized (this) {
+ if (!initialized) {
+ this.ref = initializer.get();
+ initialized = true;
+ }
+ }
+ }
+
Review Comment:
nit. remove additional line break
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -205,6 +207,7 @@ public Map<Pair<String, String>, BloomFilter>
getBloomFilters(final List<Pair<St
HoodieTimer timer = HoodieTimer.start();
Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+ // TODO simplify (no sorting is required)
Review Comment:
remove any cleanups if need be
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -203,14 +197,14 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(L
return mergedRecords.stream()
.map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+ .filter(Objects::nonNull)
.iterator();
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata
table for " + sortedKeyPrefixes.size() + " key : ", ioe);
} finally {
closeReader(readers);
}
- })
- .filter(Objects::nonNull);
+ });
}
@Override
Review Comment:
for colstats, I do see we do parallelize at higher layer, but thinking for
other calls if we can try and leverage spark context to parallelize calls
across file slics. for files partitions, there is only one file slice. so,
might be a no-op. but for bloom
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]