This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch rfc-60-ossstorage-poc in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8eab65592a5972dc548046d4442932f94a42e4cb Author: zhangyue143 <[email protected]> AuthorDate: Wed Dec 11 11:14:21 2024 +0800 poc finished --- .../org/apache/hudi/io/HoodieCreateHandle.java | 3 +- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 18 ++++++- .../table/view/AbstractTableFileSystemView.java | 9 +++- .../apache/hudi/metadata/BaseTableMetadata.java | 6 ++- .../hudi/metadata/HoodieMetadataPayload.java | 15 +++++- .../hudi/storage/HoodieOSSStorageStrategy.java | 62 ++++++++++++++++++++++ .../org/apache/hudi/storage/HoodieStorage.java | 17 ++++++ .../apache/hudi/storage/HoodieStorageStrategy.java | 51 ++++++++++++++++++ .../spark/sql/hudi/dml/TestInsertTable.scala | 36 +++++++++++++ 9 files changed, 209 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 12406927ae6..d69630edf47 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -96,8 +96,7 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O try { HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime, - new StoragePath(config.getBasePath()), - FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath), + new StoragePath(config.getBasePath()), getPartitionPath(partitionPath), hoodieTable.getPartitionMetafileFormat()); partitionMetadata.trySave(); createMarkerFile(partitionPath, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 73f05789083..882bd2e64e9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -38,6 +38,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieOSSStorageStrategy; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -46,11 +47,13 @@ import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; @@ -112,8 +115,21 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I, return FSUtils.makeWriteToken(getPartitionId(), getStageId(), getAttemptId()); } + protected StoragePath getPartitionPath(String partitionPath) { + boolean isMDT = hoodieTable.getMetaClient().getTableConfig().getTableName().contains("metadata"); + if (isMDT) { + return FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath); + } else { + HashMap<String, String> configMap = new HashMap<>(); + Path basePath = new Path(config.getBasePath()); + configMap.put(HoodieOSSStorageStrategy.FILE_ID_KEY, fileId); + configMap.put(HoodieOSSStorageStrategy.TABLE_BASE_PATH, basePath.toUri().getPath()); + return storage.storageLocation(partitionPath, configMap); + } + } + public StoragePath makeNewPath(String partitionPath) { - StoragePath path = FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath); + StoragePath path = getPartitionPath(partitionPath); try { if (!storage.exists(path)) { storage.createDirectory(path); // create a new partition as needed. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index bcc32e01064..99682ae4ad3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -44,6 +44,8 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieOSSStorageStrategy; +import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; @@ -162,8 +164,13 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV * If the file statuses are limited to a single partition, use {@link #addFilesToView(String, List)} instead. */ public List<HoodieFileGroup> addFilesToView(List<StoragePathInfo> statuses) { + HoodieStorage storage = metaClient.getStorage(); + HashMap<String, String> config = new HashMap<>(); + config.put(HoodieOSSStorageStrategy.TABLE_NAME, metaClient.getTableConfig().getTableName()); Map<String, List<StoragePathInfo>> statusesByPartitionPath = statuses.stream() - .collect(Collectors.groupingBy(fileStatus -> FSUtils.getRelativePartitionPath(metaClient.getBasePath(), fileStatus.getPath().getParent()))); + .collect(Collectors.groupingBy(fileStatus -> + storage.getRelativePath(fileStatus.getPath().getParent().getPathWithoutSchemeAndAuthority().toUri().getPath(), config) + .toUri().getPath())); return statusesByPartitionPath.entrySet().stream().map(entry -> addFilesToView(entry.getKey(), entry.getValue())) .flatMap(List::stream).collect(Collectors.toList()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 4aea9eeb356..89e0f013bf4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -392,7 +392,7 @@ public abstract class BaseTableMetadata extends AbstractHoodieTableMetadata { HoodieMetadataPayload metadataPayload = record.getData(); checkForSpuriousDeletes(metadataPayload, recordKey); try { - return metadataPayload.getFileList(dataMetaClient.getStorage(), partitionPath); + return metadataPayload.getFileList(dataMetaClient, partitionPath); } catch (Exception e) { throw new HoodieException("Failed to extract file-pathInfoList from the payload", e); } @@ -404,6 +404,7 @@ public abstract class BaseTableMetadata extends AbstractHoodieTableMetadata { } Map<String, List<StoragePathInfo>> fetchAllFilesInPartitionPaths(List<StoragePath> partitionPaths) { + // file:/private/var/folders/ww/dc150vl50815wqgbsx_98w9w0000gp/T/spark-a496632e-e844-4103-9fde-cc98e1afdca2/h1/dt=2021-01-06 Map<String, StoragePath> partitionIdToPathMap = partitionPaths.parallelStream() .collect( @@ -415,6 +416,7 @@ public abstract class BaseTableMetadata extends AbstractHoodieTableMetadata { ); HoodieTimer timer = HoodieTimer.start(); + // dt=2021-01-05 -> {StoragePath@24246} "file:/private/var/folders/ww/dc150vl50815wqgbsx_98w9w0000gp/T/spark-a496632e-e844-4103-9fde-cc98e1afdca2/h1/dt=2021-01-05" Map<String, HoodieRecord<HoodieMetadataPayload>> partitionIdRecordPairs = getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()), MetadataPartitionType.FILES.getPartitionPath()); @@ -430,7 +432,7 @@ public abstract class BaseTableMetadata extends AbstractHoodieTableMetadata { HoodieMetadataPayload metadataPayload = e.getValue().getData(); checkForSpuriousDeletes(metadataPayload, partitionId); - List<StoragePathInfo> files = metadataPayload.getFileList(dataMetaClient.getStorage(), partitionPath); + List<StoragePathInfo> files = metadataPayload.getFileList(dataMetaClient, partitionPath); return Pair.of(partitionPath.toString(), files); }) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index b0bb9670825..e24e1a0e464 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.hash.ColumnIndexID; @@ -39,6 +40,7 @@ import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.storage.HoodieOSSStorageStrategy; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; @@ -47,6 +49,7 @@ import org.apache.hudi.util.Lazy; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import javax.annotation.Nullable; @@ -429,13 +432,21 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata /** * Returns the files added as part of this record. */ - public List<StoragePathInfo> getFileList(HoodieStorage storage, StoragePath partitionPath) { + public List<StoragePathInfo> getFileList(HoodieTableMetaClient dataMetaClient, StoragePath partitionPath) { + HoodieStorage storage = dataMetaClient.getStorage(); + String tableName = dataMetaClient.getTableConfig().getTableName(); + StoragePath dataBasePath = dataMetaClient.getBasePath(); long blockSize = storage.getDefaultBlockSize(partitionPath); return filterFileInfoEntries(false) .map(e -> { // NOTE: Since we know that the Metadata Table's Payload is simply a file-name we're // creating Hadoop's Path using more performant unsafe variant - return new StoragePathInfo(new StoragePath(partitionPath, e.getKey()), e.getValue().getSize(), + HashMap<String, String> config = new HashMap<>(); + config.put(HoodieOSSStorageStrategy.FILE_ID_KEY, FSUtils.getFileId(new Path(e.getKey()).getName())); + config.put(HoodieOSSStorageStrategy.TABLE_BASE_PATH, dataBasePath.getPathWithoutSchemeAndAuthority().toUri().getPath()); + StoragePath part = storage.storageLocation(FSUtils.getRelativePartitionPath(dataBasePath, partitionPath), config); + StoragePath filePath = new StoragePath(part, new Path(e.getKey()).getName()); + return new StoragePathInfo(filePath, e.getValue().getSize(), false, (short) 0, blockSize, 0); }) .collect(Collectors.toList()); diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieOSSStorageStrategy.java b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieOSSStorageStrategy.java new file mode 100644 index 00000000000..90fe64e242c --- /dev/null +++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieOSSStorageStrategy.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.storage; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class HoodieOSSStorageStrategy implements HoodieStorageStrategy { + + public static final String FILE_ID_KEY = "hoodie_file_id"; + public static final String TABLE_BASE_PATH = "hoodie_table_base_path"; + public static final String TABLE_NAME = "hoodie_table_name"; + public static final String TABLE_STORAGE_PATH = "hoodie_storage_path"; + private final String hoodieStoragePath; + private final String basePath; + public HoodieOSSStorageStrategy(StorageConfiguration<?> storageConf) { + this.hoodieStoragePath = ""; + this.basePath = ""; + } + + @Override + public StoragePath storageLocation(String path, Map<String, String> configMap) { + String fileID = configMap.get(FILE_ID_KEY); + String basePath = configMap.get(TABLE_BASE_PATH); + String storagePath = configMap.get(TABLE_STORAGE_PATH); + int hash = (path + fileID).hashCode() & Integer.MAX_VALUE; + String urlString = "/tmp/bucketA/" + hoodieStoragePath + "/" + hash + "/" + basePath + "/" + path; + return new StoragePath(urlString); + } + + @Override + public Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist) { + HashSet<StoragePath> res = new HashSet<>(); + res.add( new StoragePath(basePath, partitionPath)); + return res; + } + + @Override + public StoragePath getRelativePath(String path, Map<String, String> configMap) { + String tableName = configMap.get(TABLE_NAME); + String[] res = path.split(tableName); + return new StoragePath(res[res.length - 1].replace("/", "")); + } +} diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java index 8de04f6903e..4d25f18e71a 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java @@ -37,6 +37,8 @@ import java.io.OutputStream; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; /** @@ -48,9 +50,24 @@ public abstract class HoodieStorage implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(HoodieStorage.class); protected final StorageConfiguration<?> storageConf; + protected final HoodieOSSStorageStrategy ossStorageStrategy; public HoodieStorage(StorageConfiguration<?> storageConf) { this.storageConf = storageConf; + this.ossStorageStrategy = new HoodieOSSStorageStrategy(storageConf); + + } + + public StoragePath storageLocation(String path, Map<String, String> configMap) { + return ossStorageStrategy.storageLocation(path, configMap); + } + + public Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist) { + return ossStorageStrategy.getAllLocations(partitionPath, checkExist); + } + + public StoragePath getRelativePath(String path, Map<String, String> configMap) { + return ossStorageStrategy.getRelativePath(path, configMap); } /** diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorageStrategy.java b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorageStrategy.java new file mode 100644 index 00000000000..9b9609a67eb --- /dev/null +++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorageStrategy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.storage; + +import java.util.Map; +import java.util.Set; + +public interface HoodieStorageStrategy { + /** + * Return a storage location for the given path + * + * @param path + * @param configMap + * @return Append the appropriate prefix based on the Path and return + */ + StoragePath storageLocation(String path, Map<String, String> configMap); + + /** + * Return all possible StoragePaths + * + * @param partitionPath + * @param checkExist check if StoragePath is truly existed or not. + * @return a st of storage partition path + */ + Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist); + + /** + * Return RelativePath base on path and locations. + * + * @param path + * @return relative path + */ + StoragePath getRelativePath(String path, Map<String, String> configMap); +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index b8912d58c5c..d167c4d6c8e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -189,6 +189,42 @@ class TestInsertTable extends HoodieSparkSqlTestBase { }) } + test("Test Insert Into with values and OSS storage") { + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties (primaryKey = 'id', hoodie.storage.path = '/tmp/bucketA/') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/${tableName}' + """.stripMargin) + + println(s"""${tmp.getCanonicalPath}/${tableName}""") + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (3, 'a3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName where year(dt) > '2020' and lower(name) > 'a0'")( + Seq(1, "a1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + }) + } + test("Test Insert Into with static partition") { Seq("cow", "mor").foreach { tableType => withTempDir { tmp =>
