danny0405 commented on code in PR #9209:
URL: https://github.com/apache/hudi/pull/9209#discussion_r1305303946


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ArchivedInstantReadSchemas;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Represents the LSM Timeline for the Hoodie table.
+ *
+ * <p>After several instants are accumulated as a batch on the active 
timeline, they would be archived as a parquet file into the archived timeline.
+ * In general the archived timeline is comprised with parquet files with LSM 
style file layout. Each new operation to the archived timeline generates
+ * a new snapshot version. Theoretically, there could be multiple snapshot 
versions on the archived timeline.
+ *
+ * <p><h2>The Archived Timeline Layout</h2>
+ *
+ * <pre>
+ *   t111, t112 ... t120 ... ->
+ *     \              /
+ *        \        /
+ *            |
+ *            V
+ *   t111_t120_0.parquet, t101_t110_0.parquet,...  t11_t20_0.parquet    L0
+ *                                  \                    /
+ *                                     \              /
+ *                                            |
+ *                                            V
+ *                                    t11_t100_1.parquet                L1
+ *
+ *      manifest_1, manifest_2, ... manifest_12
+ *                                      |
+ *                                      V
+ *                                  _version_
+ * </pre>
+ *
+ * <p><h2>The LSM Tree Compaction</h2>
+ * Use the universal compaction strategy, that is: when N(by default 10) 
number of parquet files exist in the current layer, they are merged and flush 
as a compacted file in the next layer.
+ * We have no limit for the layer number, assumes there are 10 instants for 
each file in L0, there could be 100 instants per file in L1,
+ * so 3000 instants could be represented as 3 parquets in L2, it is pretty 
fast if we use concurrent read.
+ *
+ * <p>The benchmark shows 1000 instants read cost about 10 ms.
+ *
+ * <p><h2>The Archiver & Reader Snapshot Isolation</h2>
+ *
+ * <p>In order to make snapshot isolation of the archived timeline write/read, 
we add two kinds of metadata files for the LSM tree version management:
+ * <ol>
+ *   <li>Manifest file: Each new file in layer 0 or each compaction would 
generate a new manifest file, the manifest file records the valid file handles 
of the latest snapshot;</li>
+ *   <li>Version file: A version file is generated right after a complete 
manifest file is formed.</li>
+ * </ol>
+ *
+ * <p><h2>The Reader Workflow</h2>
+ * <ul>
+ *   <li>read the latest version;</li>
+ *   <li>read the manifest file for valid file handles;</li>
+ *   <li>probably do a data skipping with the parquet file name max min 
timestamp.</li>
+ * </ul>
+ *
+ * <p><h2>The Legacy Files Cleaning and Read Retention</h2>
+ * Only triggers file cleaning after a valid compaction.
+ *
+ * <p><h3>Clean Strategy</h3></p>
+ * Keeps only 3 valid snapshot versions for the reader, that means, a file is 
kept for at lest 3 archival trigger interval, for default configuration, it is 
30 instants time span,
+ * which is far longer that the archived timeline loading time.
+ *
+ * <p><h3>Instants TTL</h3></p>
+ * The timeline reader only reads instants of last limited days. We will by 
default skip the instants from archived timeline that are generated long time 
ago.
+ */
+public class LSMTimeline {
+  private static final Logger LOG = LoggerFactory.getLogger(LSMTimeline.class);
+
+  private static final String VERSION_FILE_NAME = "_version_";    // _version_
+  private static final String MANIFEST_FILE_PREFIX = "manifest_"; // 
manifest_[N]
+
+  private static final String TEMP_FILE_SUFFIX = ".tmp";
+
+  private static final Pattern ARCHIVE_FILE_PATTERN =
+      Pattern.compile("^(\\d+)_(\\d+)_(\\d)\\.parquet");
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  public static Schema getReadSchema(HoodieArchivedTimeline.LoadMode loadMode) 
{
+    switch (loadMode) {
+      case SLIM:
+        return ArchivedInstantReadSchemas.TIMELINE_LSM_SLIM_READ_SCHEMA;
+      case METADATA:
+        return 
ArchivedInstantReadSchemas.TIMELINE_LSM_READ_SCHEMA_WITH_METADATA;
+      case PLAN:
+        return ArchivedInstantReadSchemas.TIMELINE_LSM_READ_SCHEMA_WITH_PLAN;
+      default:
+        throw new AssertionError("Unexpected");
+    }
+  }
+
+  /**
+   * Returns whether the given file is located in the filter.
+   */
+  public static boolean isFileInRange(HoodieArchivedTimeline.TimeRangeFilter 
filter, String fileName) {
+    String minInstant = getMinInstantTime(fileName);
+    String maxInstant = getMaxInstantTime(fileName);
+    return filter.isInRange(minInstant) || filter.isInRange(maxInstant);
+  }
+
+  /**
+   * Returns the latest snapshot version.
+   */
+  public static int latestSnapshotVersion(HoodieTableMetaClient metaClient) 
throws IOException {
+    Path versionFilePath = getVersionFilePath(metaClient);
+    if (metaClient.getFs().exists(versionFilePath)) {
+      try {
+        Option<byte[]> content = 
FileIOUtils.readDataFromPath(metaClient.getFs(), versionFilePath);
+        if (content.isPresent()) {
+          return Integer.parseInt(new String(content.get(), 
StandardCharsets.UTF_8));
+        }
+      } catch (Exception e) {
+        // fallback to manifest file listing.
+        LOG.warn("Error reading version file {}", versionFilePath, e);
+      }
+    }
+    return 
allSnapshotVersions(metaClient).stream().max(Integer::compareTo).orElse(-1);
+  }
+
+  /**
+   * Returns all the valid snapshot versions.
+   */
+  public static List<Integer> allSnapshotVersions(HoodieTableMetaClient 
metaClient) throws IOException {
+    return Arrays.stream(metaClient.getFs().listStatus(new 
Path(metaClient.getArchivePath()), getManifestFilePathFilter()))
+        .map(fileStatus -> fileStatus.getPath().getName())
+        .map(LSMTimeline::getManifestVersion)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns the latest snapshot metadata files.
+   */
+  public static HoodieLSMTimelineManifest 
latestSnapshotManifest(HoodieTableMetaClient metaClient) throws IOException {
+    int latestVersion = latestSnapshotVersion(metaClient);
+    return latestSnapshotManifest(metaClient, latestVersion);
+  }
+
+  /**
+   * Reads the file list from the manifest file for the latest snapshot.
+   */
+  public static HoodieLSMTimelineManifest 
latestSnapshotManifest(HoodieTableMetaClient metaClient, int latestVersion) {
+    if (latestVersion < 0) {
+      // there is no valid snapshot of the timeline.
+      return HoodieLSMTimelineManifest.EMPTY;
+    }
+    // read and deserialize the valid files.
+    byte[] content = FileIOUtils.readDataFromPath(metaClient.getFs(), 
getManifestFilePath(metaClient, latestVersion)).get();
+    try {
+      return HoodieLSMTimelineManifest.fromJsonString(new String(content, 
StandardCharsets.UTF_8), HoodieLSMTimelineManifest.class);
+    } catch (Exception e) {
+      throw new HoodieException("Error deserializing manifest entries", e);
+    }
+  }
+
+  /**
+   * Returns the full manifest file path with given version number.
+   */
+  public static Path getManifestFilePath(HoodieTableMetaClient metaClient, int 
snapshotVersion) {
+    return new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + 
snapshotVersion);
+  }
+
+  /**
+   * Returns the full version file path with given version number.
+   */
+  public static Path getVersionFilePath(HoodieTableMetaClient metaClient) {
+    return new Path(metaClient.getArchivePath(), VERSION_FILE_NAME);
+  }
+
+  /**
+   * List all the parquet manifest files.
+   */
+  public static FileStatus[] listAllManifestFiles(HoodieTableMetaClient 
metaClient) throws IOException {
+    return metaClient.getFs().listStatus(new 
Path(metaClient.getArchivePath()), getManifestFilePathFilter());
+  }
+
+  /**
+   * List all the parquet metadata files.
+   */
+  public static FileStatus[] listAllMetaFiles(HoodieTableMetaClient 
metaClient) throws IOException {
+    return metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/*.parquet"));
+  }
+
+  /**
+   * Parse the snapshot version from the manifest file name.
+   */
+  public static int getManifestVersion(String fileName) {
+    return Integer.parseInt(fileName.split("_")[1]);
+  }
+
+  /**
+   * Parse the layer number from the file name.
+   */
+  public static int getFileLayer(String fileName) {
+    try {
+      Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+      if (fileMatcher.matches()) {
+        return Integer.parseInt(fileMatcher.group(3));
+      }
+    } catch (NumberFormatException e) {
+      // log and ignore any format warnings
+      LOG.warn("error getting file layout for archived file: " + fileName);
+    }
+
+    // return default value in case of any errors
+    return 0;
+  }
+
+  /**
+   * Parse the minimum instant time from the file name.
+   */
+  public static String getMinInstantTime(String fileName) {

Review Comment:
   Did see much gains here, we can extend it in the near future if we have more 
complex name parsing on the file names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to