vinothchandar commented on a change in pull request #2189:
URL: https://github.com/apache/hudi/pull/2189#discussion_r511039214



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
##########
@@ -18,87 +18,98 @@
 
 package org.apache.hudi.common.metrics;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hudi.common.util.ReflectionUtils;
+
 
 /**
- * Lightweight Metrics Registry to track Hudi events.
+ * Interface which defines a lightweight Metrics Registry to track Hudi events.
  */
-public class Registry {
-  ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  final String name;
-
-  private static ConcurrentHashMap<String, Registry> registryMap = new 
ConcurrentHashMap<>();
+public interface Registry extends Serializable {
+  static ConcurrentHashMap<String, Registry> REGISTRYMAP = new 
ConcurrentHashMap<>();
 
-  private Registry(String name) {
-    this.name = name;
+  /**
+   * Get (or create) the registry for a provided name.
+   *
+   * This function creates a {@code LocalRegistry}.
+   *
+   * @param registryName Name of the registry
+   */
+  public static Registry getRegistry(String registryName) {

Review comment:
       interfaces don't need to make methods public. Same for static fields. 
lets remove the qualifier?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.config.HoodieCompactionConfig.Builder;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Configurations used by the HUDI Metadata Table.
+ */
+@Immutable
+public class HoodieMetadataConfig extends DefaultHoodieConfig {
+
+  public static final String METADATA_PREFIX = "hoodie.metadata";
+
+  // Enable the internal Metadata Table which saves file listings
+  public static final String METADATA_ENABLE = METADATA_PREFIX + ".enable";

Review comment:
       please name properties using the (mostly followed) convention of ending 
the property name with _PROP. 
   
   `METADATA_ENABLE` -> `METADATA_ENABLE_PROP`

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
##########
@@ -278,7 +305,16 @@ private void initFromFilesystem(JavaSparkContext jsc, 
HoodieTableMetaClient data
     ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
 
     // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    Option<HoodieInstant> latestInstant = 
datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    // Otherwise, we use the timestamp of the instant which does not have any 
non-completed instants before it.
+    Option<HoodieInstant> latestInstant = Option.empty();
+    boolean foundNonComplete = false;
+    for (HoodieInstant instant : 
datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList()))
 {

Review comment:
       stream.forEach()? 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
##########
@@ -126,6 +134,14 @@ public static HoodieMetadataWriter instance(Configuration 
conf, HoodieWriteConfi
       // Metadata Table cannot have its metadata optimized
       ValidationUtils.checkArgument(this.config.shouldAutoCommit(), "Auto 
commit is required for Metadata Table");
       ValidationUtils.checkArgument(!this.config.useFileListingMetadata(), 
"File listing cannot be used for Metadata Table");
+
+      if (config.isMetricsOn()) {
+        if (config.isExecutorMetricsEnabled()) {

Review comment:
       combine into one if statement? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+/**
+ * Wrapper over <code>FSDataInputStream</code> to keep track of the size of 
the written bytes.

Review comment:
       this is an input stream. fix the docs?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Lightweight Metrics Registry to track Hudi events.

Review comment:
       fix docs

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
##########
@@ -95,25 +102,26 @@
   private static Map<String, HoodieMetadataWriter> instances = new HashMap<>();
 
   public static HoodieMetadataWriter instance(Configuration conf, 
HoodieWriteConfig writeConfig) {
-    try {
-      return new HoodieMetadataWriter(conf, writeConfig);
-    } catch (IOException e) {
-      throw new HoodieMetadataException("Could not initialize 
HoodieMetadataWriter", e);
+    String key = writeConfig.getBasePath();
+    if (instances.containsKey(key)) {

Review comment:
       I am still not a fan of this caching. specifically, coz it creates extra 
complexity like this. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
##########
@@ -122,6 +124,26 @@ public HoodieWriteConfig getConfig() {
   }
 
   protected HoodieTableMetaClient createMetaClient(boolean 
loadActiveTimelineOnLoad) {
+    if (config.isMetricsOn()) {
+      Registry registry;
+      Registry registryMeta;
+
+      if (config.isExecutorMetricsEnabled()) {
+        // Create a distributed registry for HoodieWrapperFileSystem
+        registry = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName(),
+            DistributedRegistry.class.getName());
+        ((DistributedRegistry)registry).register(jsc);
+        registryMeta = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + 
"MetaFolder",
+            DistributedRegistry.class.getName());
+        ((DistributedRegistry)registryMeta).register(jsc);
+      } else {
+        registry = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
+        registryMeta = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + 
"MetaFolder");
+      }
+
+      HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);

Review comment:
       Configuring this class at this layer feels adhoc. Can we do this inside 
HoodieWrapperFileSystem class or as a static block there? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
##########
@@ -18,87 +18,98 @@
 
 package org.apache.hudi.common.metrics;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hudi.common.util.ReflectionUtils;
+
 
 /**
- * Lightweight Metrics Registry to track Hudi events.
+ * Interface which defines a lightweight Metrics Registry to track Hudi events.
  */
-public class Registry {
-  ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  final String name;
-
-  private static ConcurrentHashMap<String, Registry> registryMap = new 
ConcurrentHashMap<>();
+public interface Registry extends Serializable {
+  static ConcurrentHashMap<String, Registry> REGISTRYMAP = new 
ConcurrentHashMap<>();
 
-  private Registry(String name) {
-    this.name = name;
+  /**
+   * Get (or create) the registry for a provided name.
+   *
+   * This function creates a {@code LocalRegistry}.
+   *
+   * @param registryName Name of the registry
+   */
+  public static Registry getRegistry(String registryName) {

Review comment:
       same for `abstract` 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
##########
@@ -18,87 +18,98 @@
 
 package org.apache.hudi.common.metrics;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hudi.common.util.ReflectionUtils;
+
 
 /**
- * Lightweight Metrics Registry to track Hudi events.
+ * Interface which defines a lightweight Metrics Registry to track Hudi events.
  */
-public class Registry {
-  ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  final String name;
-
-  private static ConcurrentHashMap<String, Registry> registryMap = new 
ConcurrentHashMap<>();
+public interface Registry extends Serializable {
+  static ConcurrentHashMap<String, Registry> REGISTRYMAP = new 
ConcurrentHashMap<>();

Review comment:
       rename: `REGISTRY_NAME` 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
##########
@@ -65,15 +66,54 @@
 
   public static final String HOODIE_SCHEME_PREFIX = "hoodie-";
 
-  private enum MetricName {
-    create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, 
listFiles
+  protected enum MetricName {
+    create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, 
listFiles, read, write
   }
 
   private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new 
ConcurrentHashMap<>();
   private FileSystem fileSystem;
   private URI uri;
   private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
-  private Registry metricsRegistry = 
Registry.getRegistry(this.getClass().getSimpleName());
+  private static Registry metricsRegistry;
+  private static Registry metricsRegistryMetaFolder;
+
+  @FunctionalInterface
+  public interface CheckedFunction<R> {
+    R get() throws IOException;
+  }
+
+  private static Registry getMetricRegistryForPath(Path p) {
+    return ((p != null) && 
(p.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)))
+        ? metricsRegistryMetaFolder : metricsRegistry;
+  }
+
+  protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, 
CheckedFunction<R> func) throws IOException {
+    long t1 = System.currentTimeMillis();

Review comment:
       lets use the HoodieTimer class here? 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.config.HoodieCompactionConfig.Builder;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Configurations used by the HUDI Metadata Table.
+ */
+@Immutable
+public class HoodieMetadataConfig extends DefaultHoodieConfig {
+
+  public static final String METADATA_PREFIX = "hoodie.metadata";
+
+  // Enable the internal Metadata Table which saves file listings
+  public static final String METADATA_ENABLE = METADATA_PREFIX + ".enable";
+  public static final boolean DEFAULT_METADATA_ENABLE = false;
+
+  // Validate contents of Metadata Table on each access against the actual 
filesystem
+  public static final String METADATA_VALIDATE = METADATA_PREFIX + ".validate";
+  public static final boolean DEFAULT_METADATA_VALIDATE = false;
+
+  // Parallelism for inserts
+  public static final String INSERT_PARALLELISM = METADATA_PREFIX + 
".insert.parallelism";
+  public static final int DEFAULT_INSERT_PARALLELISM = 1;
+
+  // Async clean
+  public static final String ASYNC_CLEAN = METADATA_PREFIX + ".clean.async";
+  public static final boolean DEFAULT_ASYNC_CLEAN = false;
+
+  // Maximum delta commits before compaction occurs
+  public static final String COMPACT_NUM_DELTA_COMMITS = METADATA_PREFIX + 
".compact.max.delta.commits";
+  public static final int DEFAULT_COMPACT_NUM_DELTA_COMMITS = 24;
+
+  // Archival settings
+  public static final String MIN_COMMITS_TO_KEEP = METADATA_PREFIX + 
".keep.min.commits";
+  public static final int DEFAULT_MIN_COMMITS_TO_KEEP = 20;
+  public static final String MAX_COMMITS_TO_KEEP = METADATA_PREFIX + 
".keep.max.commits";
+  public static final int DEFAULT_MAX_COMMITS_TO_KEEP = 30;
+
+  // Cleaner commits retained
+  public static final String CLEANER_COMMITS_RETAINED = METADATA_PREFIX + 
".cleaner.commits.retained";
+  public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
+
+  private HoodieMetadataConfig(Properties props) {
+    super(props);
+  }
+
+  public static HoodieMetadataConfig.Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private final Properties props = new Properties();
+
+    public Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.props.load(reader);
+        return this;
+      }
+    }
+
+    public Builder fromProperties(Properties props) {
+      this.props.putAll(props);
+      return this;
+    }
+
+    public Builder enable(boolean enable) {
+      props.setProperty(METADATA_ENABLE, String.valueOf(enable));
+      return this;
+    }
+
+    public Builder validate(boolean validate) {
+      props.setProperty(METADATA_VALIDATE, String.valueOf(validate));
+      return this;
+    }
+
+    public Builder withInsertParallelism(int parallelism) {
+      props.setProperty(INSERT_PARALLELISM, String.valueOf(parallelism));
+      return this;
+    }
+
+    public Builder withAsyncClean(boolean asyncClean) {
+      props.setProperty(ASYNC_CLEAN, String.valueOf(asyncClean));
+      return this;
+    }
+
+    public Builder withMaxNumDeltaCommitsBeforeCompaction(int 
maxNumDeltaCommitsBeforeCompaction) {
+      props.setProperty(COMPACT_NUM_DELTA_COMMITS, 
String.valueOf(maxNumDeltaCommitsBeforeCompaction));
+      return this;
+    }
+
+    public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
+      props.setProperty(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
+      props.setProperty(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
+      return this;
+    }
+
+    public Builder retainCommits(int commitsRetained) {
+      props.setProperty(CLEANER_COMMITS_RETAINED, 
String.valueOf(commitsRetained));
+      return this;
+    }
+
+    public HoodieMetadataConfig build() {

Review comment:
       FWIW I still prefer honoring all table/write configs for metadata. This 
file can then contain different defaults for eg. Redefining every param here 
again, is going to be cumbersome. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
##########
@@ -95,25 +102,26 @@
   private static Map<String, HoodieMetadataWriter> instances = new HashMap<>();
 
   public static HoodieMetadataWriter instance(Configuration conf, 
HoodieWriteConfig writeConfig) {

Review comment:
       rename: create() per standard factory pattern conventions? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+/**
+ * Wrapper over <code>FSDataInputStream</code> to keep track of the size of 
the written bytes.
+ */
+public class SizeAwareFSDataInputStream extends FSDataInputStream {

Review comment:
       Please name this something like `TimedFSDataInputStream`. This is not 
tracking size at all 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -232,6 +232,11 @@ private HoodieTable 
getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOp
     return table;
   }
 
+  protected HoodieTable<T> getTable() {

Review comment:
       this is creating a new table, not fetching an existing member. Any 
reason we can't just use `HoodieTable.create` factory. It took me quite a bit 
of time to clean up usages last time around . :) 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
##########
@@ -164,79 +205,99 @@ private FSDataOutputStream wrapOutputStream(final Path 
path, FSDataOutputStream
     return os;
   }
 
+  private FSDataInputStream wrapInputStream(final Path path, FSDataInputStream 
fsDataInputStream) throws IOException {
+    if (fsDataInputStream instanceof SizeAwareFSDataInputStream) {
+      return fsDataInputStream;
+    }
+
+    SizeAwareFSDataInputStream os = new SizeAwareFSDataInputStream(path, 
fsDataInputStream);

Review comment:
       rename: is

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -232,6 +232,11 @@ private HoodieTable 
getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOp
     return table;
   }
 
+  protected HoodieTable<T> getTable() {

Review comment:
       lets avoid this method please 

##########
File path: 
hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
##########
@@ -457,14 +479,14 @@ public void testArchivingAndCompaction() throws Exception 
{
     init(HoodieTableType.COPY_ON_WRITE);
 
     final int maxDeltaCommitsBeforeCompaction = 6;
+    // Test autoClean and asyncClean based on this flag which is randomly 
chosen.
+    boolean asyncClean = new Random().nextBoolean();

Review comment:
       this kind of randomization makes test non-deterministic. Please make 
this parameterized . 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to