This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch rfc-60-ossstorage-poc
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8eab65592a5972dc548046d4442932f94a42e4cb
Author: zhangyue143 <[email protected]>
AuthorDate: Wed Dec 11 11:14:21 2024 +0800

    poc finished
---
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  3 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java | 18 ++++++-
 .../table/view/AbstractTableFileSystemView.java    |  9 +++-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  6 ++-
 .../hudi/metadata/HoodieMetadataPayload.java       | 15 +++++-
 .../hudi/storage/HoodieOSSStorageStrategy.java     | 62 ++++++++++++++++++++++
 .../org/apache/hudi/storage/HoodieStorage.java     | 17 ++++++
 .../apache/hudi/storage/HoodieStorageStrategy.java | 51 ++++++++++++++++++
 .../spark/sql/hudi/dml/TestInsertTable.scala       | 36 +++++++++++++
 9 files changed, 209 insertions(+), 8 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 12406927ae6..d69630edf47 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -96,8 +96,7 @@ public class HoodieCreateHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
 
     try {
       HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(storage, instantTime,
-          new StoragePath(config.getBasePath()),
-          FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
+          new StoragePath(config.getBasePath()), 
getPartitionPath(partitionPath),
           hoodieTable.getPartitionMetafileFormat());
       partitionMetadata.trySave();
       createMarkerFile(partitionPath,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 73f05789083..882bd2e64e9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieOSSStorageStrategy;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
@@ -46,11 +47,13 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
@@ -112,8 +115,21 @@ public abstract class HoodieWriteHandle<T, I, K, O> 
extends HoodieIOHandle<T, I,
     return FSUtils.makeWriteToken(getPartitionId(), getStageId(), 
getAttemptId());
   }
 
+  protected StoragePath getPartitionPath(String partitionPath) {
+    boolean isMDT = 
hoodieTable.getMetaClient().getTableConfig().getTableName().contains("metadata");
+    if (isMDT) {
+      return FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath);
+    } else {
+      HashMap<String, String> configMap = new HashMap<>();
+      Path basePath = new Path(config.getBasePath());
+      configMap.put(HoodieOSSStorageStrategy.FILE_ID_KEY, fileId);
+      configMap.put(HoodieOSSStorageStrategy.TABLE_BASE_PATH, 
basePath.toUri().getPath());
+      return storage.storageLocation(partitionPath, configMap);
+    }
+  }
+
   public StoragePath makeNewPath(String partitionPath) {
-    StoragePath path = FSUtils.constructAbsolutePath(config.getBasePath(), 
partitionPath);
+    StoragePath path = getPartitionPath(partitionPath);
     try {
       if (!storage.exists(path)) {
         storage.createDirectory(path); // create a new partition as needed.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index bcc32e01064..99682ae4ad3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -44,6 +44,8 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieOSSStorageStrategy;
+import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
@@ -162,8 +164,13 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
    * If the file statuses are limited to a single partition, use {@link 
#addFilesToView(String, List)} instead.
    */
   public List<HoodieFileGroup> addFilesToView(List<StoragePathInfo> statuses) {
+    HoodieStorage storage = metaClient.getStorage();
+    HashMap<String, String> config = new HashMap<>();
+    config.put(HoodieOSSStorageStrategy.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
     Map<String, List<StoragePathInfo>> statusesByPartitionPath = 
statuses.stream()
-        .collect(Collectors.groupingBy(fileStatus -> 
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), 
fileStatus.getPath().getParent())));
+        .collect(Collectors.groupingBy(fileStatus ->
+            
storage.getRelativePath(fileStatus.getPath().getParent().getPathWithoutSchemeAndAuthority().toUri().getPath(),
 config)
+            .toUri().getPath()));
     return statusesByPartitionPath.entrySet().stream().map(entry -> 
addFilesToView(entry.getKey(), entry.getValue()))
         .flatMap(List::stream).collect(Collectors.toList());
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 4aea9eeb356..89e0f013bf4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -392,7 +392,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
           HoodieMetadataPayload metadataPayload = record.getData();
           checkForSpuriousDeletes(metadataPayload, recordKey);
           try {
-            return metadataPayload.getFileList(dataMetaClient.getStorage(), 
partitionPath);
+            return metadataPayload.getFileList(dataMetaClient, partitionPath);
           } catch (Exception e) {
             throw new HoodieException("Failed to extract file-pathInfoList 
from the payload", e);
           }
@@ -404,6 +404,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
   }
 
   Map<String, List<StoragePathInfo>> 
fetchAllFilesInPartitionPaths(List<StoragePath> partitionPaths) {
+    // 
file:/private/var/folders/ww/dc150vl50815wqgbsx_98w9w0000gp/T/spark-a496632e-e844-4103-9fde-cc98e1afdca2/h1/dt=2021-01-06
     Map<String, StoragePath> partitionIdToPathMap =
         partitionPaths.parallelStream()
             .collect(
@@ -415,6 +416,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
             );
 
     HoodieTimer timer = HoodieTimer.start();
+    // dt=2021-01-05 -> {StoragePath@24246} 
"file:/private/var/folders/ww/dc150vl50815wqgbsx_98w9w0000gp/T/spark-a496632e-e844-4103-9fde-cc98e1afdca2/h1/dt=2021-01-05"
     Map<String, HoodieRecord<HoodieMetadataPayload>> partitionIdRecordPairs =
         getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()),
             MetadataPartitionType.FILES.getPartitionPath());
@@ -430,7 +432,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
               HoodieMetadataPayload metadataPayload = e.getValue().getData();
               checkForSpuriousDeletes(metadataPayload, partitionId);
 
-              List<StoragePathInfo> files = 
metadataPayload.getFileList(dataMetaClient.getStorage(), partitionPath);
+              List<StoragePathInfo> files = 
metadataPayload.getFileList(dataMetaClient, partitionPath);
               return Pair.of(partitionPath.toString(), files);
             })
         .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index b0bb9670825..e24e1a0e464 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.hash.ColumnIndexID;
@@ -39,6 +40,7 @@ import org.apache.hudi.common.util.hash.FileIndexID;
 import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.storage.HoodieOSSStorageStrategy;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
@@ -47,6 +49,7 @@ import org.apache.hudi.util.Lazy;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
 
 import javax.annotation.Nullable;
 
@@ -429,13 +432,21 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
   /**
    * Returns the files added as part of this record.
    */
-  public List<StoragePathInfo> getFileList(HoodieStorage storage, StoragePath 
partitionPath) {
+  public List<StoragePathInfo> getFileList(HoodieTableMetaClient 
dataMetaClient, StoragePath partitionPath) {
+    HoodieStorage storage = dataMetaClient.getStorage();
+    String tableName = dataMetaClient.getTableConfig().getTableName();
+    StoragePath dataBasePath = dataMetaClient.getBasePath();
     long blockSize = storage.getDefaultBlockSize(partitionPath);
     return filterFileInfoEntries(false)
         .map(e -> {
           // NOTE: Since we know that the Metadata Table's Payload is simply a 
file-name we're
           //       creating Hadoop's Path using more performant unsafe variant
-          return new StoragePathInfo(new StoragePath(partitionPath, 
e.getKey()), e.getValue().getSize(),
+          HashMap<String, String> config = new HashMap<>();
+          config.put(HoodieOSSStorageStrategy.FILE_ID_KEY, 
FSUtils.getFileId(new Path(e.getKey()).getName()));
+          config.put(HoodieOSSStorageStrategy.TABLE_BASE_PATH, 
dataBasePath.getPathWithoutSchemeAndAuthority().toUri().getPath());
+          StoragePath part = 
storage.storageLocation(FSUtils.getRelativePartitionPath(dataBasePath, 
partitionPath), config);
+          StoragePath filePath = new StoragePath(part, new 
Path(e.getKey()).getName());
+          return new StoragePathInfo(filePath, e.getValue().getSize(),
               false, (short) 0, blockSize, 0);
         })
         .collect(Collectors.toList());
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieOSSStorageStrategy.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieOSSStorageStrategy.java
new file mode 100644
index 00000000000..90fe64e242c
--- /dev/null
+++ 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieOSSStorageStrategy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class HoodieOSSStorageStrategy implements HoodieStorageStrategy {
+
+  public static final String FILE_ID_KEY = "hoodie_file_id";
+  public static final String TABLE_BASE_PATH = "hoodie_table_base_path";
+  public static final String TABLE_NAME = "hoodie_table_name";
+  public static final String TABLE_STORAGE_PATH = "hoodie_storage_path";
+  private final String hoodieStoragePath;
+  private final String basePath;
+  public HoodieOSSStorageStrategy(StorageConfiguration<?> storageConf) {
+    this.hoodieStoragePath = "";
+    this.basePath = "";
+  }
+
+  @Override
+  public StoragePath storageLocation(String path, Map<String, String> 
configMap) {
+    String fileID = configMap.get(FILE_ID_KEY);
+    String basePath = configMap.get(TABLE_BASE_PATH);
+    String storagePath = configMap.get(TABLE_STORAGE_PATH);
+    int hash = (path + fileID).hashCode() & Integer.MAX_VALUE;
+    String urlString = "/tmp/bucketA/" + hoodieStoragePath + "/" + hash + "/" 
+ basePath + "/" + path;
+    return new StoragePath(urlString);
+  }
+
+  @Override
+  public Set<StoragePath> getAllLocations(String partitionPath, boolean 
checkExist) {
+    HashSet<StoragePath> res = new HashSet<>();
+    res.add( new StoragePath(basePath, partitionPath));
+    return res;
+  }
+
+  @Override
+  public StoragePath getRelativePath(String path, Map<String, String> 
configMap) {
+    String tableName = configMap.get(TABLE_NAME);
+    String[] res = path.split(tableName);
+    return new StoragePath(res[res.length - 1].replace("/", ""));
+  }
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index 8de04f6903e..4d25f18e71a 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -37,6 +37,8 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -48,9 +50,24 @@ public abstract class HoodieStorage implements Closeable {
   public static final Logger LOG = 
LoggerFactory.getLogger(HoodieStorage.class);
 
   protected final StorageConfiguration<?> storageConf;
+  protected final HoodieOSSStorageStrategy ossStorageStrategy;
 
   public HoodieStorage(StorageConfiguration<?> storageConf) {
     this.storageConf = storageConf;
+    this.ossStorageStrategy = new HoodieOSSStorageStrategy(storageConf);
+
+  }
+
+  public StoragePath  storageLocation(String path, Map<String, String> 
configMap) {
+    return ossStorageStrategy.storageLocation(path, configMap);
+  }
+
+  public Set<StoragePath> getAllLocations(String partitionPath, boolean 
checkExist) {
+    return ossStorageStrategy.getAllLocations(partitionPath, checkExist);
+  }
+
+  public StoragePath getRelativePath(String path, Map<String, String> 
configMap) {
+    return ossStorageStrategy.getRelativePath(path, configMap);
   }
 
   /**
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorageStrategy.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorageStrategy.java
new file mode 100644
index 00000000000..9b9609a67eb
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorageStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface HoodieStorageStrategy {
+  /**
+   * Return a storage location for the given path
+   *
+   * @param path
+   * @param configMap
+   * @return Append the appropriate prefix based on the Path and return
+   */
+  StoragePath storageLocation(String path, Map<String, String> configMap);
+
+  /**
+   * Return all possible StoragePaths
+   *
+   * @param partitionPath
+   * @param checkExist check if StoragePath is truly existed or not.
+   * @return a st of storage partition path
+   */
+  Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist);
+
+  /**
+   * Return RelativePath base on path and locations.
+   *
+   * @param path
+   * @return relative path
+   */
+  StoragePath getRelativePath(String path, Map<String, String> configMap);
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index b8912d58c5c..d167c4d6c8e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -189,6 +189,42 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     })
   }
 
+  test("Test Insert Into with values and OSS storage") {
+    withRecordType()(withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create a partitioned table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  dt string,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | tblproperties (primaryKey = 'id', hoodie.storage.path = 
'/tmp/bucketA/')
+           | partitioned by (dt)
+           | location '${tmp.getCanonicalPath}/${tableName}'
+       """.stripMargin)
+
+      println(s"""${tmp.getCanonicalPath}/${tableName}""")
+      // Note: Do not write the field alias, the partition field must be 
placed last.
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1', 10, 1000, "2021-01-05"),
+           | (2, 'a2', 20, 2000, "2021-01-06"),
+           | (3, 'a3', 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
year(dt) > '2020' and lower(name) > 'a0'")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+        Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+        Seq(3, "a3", 30.0, 3000, "2021-01-07")
+      )
+    })
+  }
+
   test("Test Insert Into with static partition") {
     Seq("cow", "mor").foreach { tableType =>
       withTempDir { tmp =>

Reply via email to