[
https://issues.apache.org/jira/browse/GOBBLIN-1709?focusedWorklogId=809167&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-809167
]
ASF GitHub Bot logged work on GOBBLIN-1709:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Sep/22 15:55
Start Date: 15/Sep/22 15:55
Worklog Time Spent: 10m
Work Description: meethngala commented on code in PR #3560:
URL: https://github.com/apache/gobblin/pull/3560#discussion_r972165125
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+ private final String dbName;
+ private final String inputTableName;
+ private IcebergTable icebergTable;
+ protected Properties properties;
+ protected FileSystem fs;
+
+ private Optional<String> sourceMetastoreURI;
+ private Optional<String> targetMetastoreURI;
+
+ /** Target metastore URI */
+ public static final String TARGET_METASTORE_URI_KEY =
+ IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".copy.target.metastore.uri";
+ /** Target database name */
+ public static final String TARGET_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+ public IcebergDataset(String db, String table, IcebergTable icebergTbl,
Properties properties, FileSystem fs) {
+ this.dbName = db;
+ this.inputTableName = table;
+ this.icebergTable = icebergTbl;
+ this.properties = properties;
+ this.fs = fs;
+ this.sourceMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_METASTORE_URI_KEY));
+ this.targetMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+ }
+
+ public IcebergDataset(String db, String table) {
+ this.dbName = db;
+ this.inputTableName = table;
+ }
+
+ /**
+ * Represents a source {@link FileStatus} and a {@link Path} destination.
+ */
+ @Data
+ private static class SourceAndDestination {
+ private final FileStatus source;
+ private final Path destination;
+ }
+
+ @Override
+ public String datasetURN() {
+ return this.dbName + "." + this.inputTableName;
+ }
+
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration) {
+ return getCopyEntities(configuration);
+ }
+ /**
+ * Finds all files read by the table and generates CopyableFiles.
+ * For the specific semantics see {@link #getCopyEntities}.
+ */
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration,
+ Comparator<FileSet<CopyEntity>> prioritizer,
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+ return getCopyEntities(configuration);
+ }
+
+ /**
+ * Finds all files read by the table and generates {@link CopyEntity}s for
duplicating the table.
+ */
+ Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration
configuration) {
+ FileSet<CopyEntity> fileSet = new
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+ return Iterators.singletonIterator(fileSet); }
+
+ /**
+ * Finds all files read by the table file set and generates {@link
CopyEntity}s for duplicating the table.
+ */
+ @VisibleForTesting
+ Collection<CopyEntity> generateCopyEntitiesForTableFileSet(CopyConfiguration
configuration) throws IOException {
+ String fileSet = this.getInputTableName();
+ List<CopyEntity> copyEntities = Lists.newArrayList();
+ log.info("Fetching all the files to be copied");
+ Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+
+ log.info("Fetching copyable file builders from their respective file sets
and adding to collection of copy entities");
+ for (CopyableFile.Builder builder :
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+ CopyableFile fileEntity =
+
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+ fileEntity.setSourceData(getSourceDataset());
+ fileEntity.setDestinationData(getDestinationDataset());
+ copyEntities.add(fileEntity);
+ }
+ return copyEntities;
+ }
+
+ /**
+ * Get builders for a {@link CopyableFile} for each file referred to by a
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.
+ */
+ List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, FileStatus>
paths, CopyConfiguration configuration) throws IOException {
+
+ List<CopyableFile.Builder> builders = Lists.newArrayList();
+ List<SourceAndDestination> dataFiles = Lists.newArrayList();
+ Configuration hadoopConfiguration = new Configuration();
+ FileSystem actualSourceFs;
+
+ for(Map.Entry<Path, FileStatus> entry : paths.entrySet()) {
+ dataFiles.add(new SourceAndDestination(entry.getValue(),
this.fs.makeQualified(entry.getKey())));
+ }
+
+ for(SourceAndDestination sourceAndDestination : dataFiles) {
+ actualSourceFs =
sourceAndDestination.getSource().getPath().getFileSystem(hadoopConfiguration);
+
+ // TODO Add ancestor owner and permissions in future releases
+ builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs,
sourceAndDestination.getSource(),
+ sourceAndDestination.getDestination(), configuration));
+ }
+ return builders;
+ }
+ /**
+ * Finds all files read by the Iceberg table including metadata json file,
manifest files, nested manifest file paths and actual data files.
+ * Returns a map of path, file status for each file that needs to be copied
+ */
+ Map<Path, FileStatus> getFilePaths() throws IOException {
+ Map<Path, FileStatus> result = Maps.newHashMap();
+ IcebergTable icebergTable = this.getIcebergTable();
+ IcebergSnapshotInfo icebergSnapshotInfo =
icebergTable.getCurrentSnapshotInfo();
+
+ log.info("Fetching all file paths for the current snapshot of the Iceberg
table");
+ List<String> pathsToCopy = icebergSnapshotInfo.getAllPaths();
+
+ for(String pathString : pathsToCopy) {
+ Path path = new Path(pathString);
+ result.put(path, this.fs.getFileStatus(path));
+ }
+ return result;
+ }
+
+ DatasetDescriptor getSourceDataset() {
+ return getDatasetDescriptor(sourceMetastoreURI);
+ }
+
+ DatasetDescriptor getDestinationDataset() {
+ return getDatasetDescriptor(targetMetastoreURI);
+ }
+
+ @NotNull
+ private DatasetDescriptor getDatasetDescriptor(Optional<String>
stringMetastoreURI) {
+ String destinationTable = this.getDbName() + "." +
this.getInputTableName();
+
+ URI hiveMetastoreURI = null;
+ if (stringMetastoreURI.isPresent()) {
+ hiveMetastoreURI = URI.create(stringMetastoreURI.get());
+ }
+
+ DatasetDescriptor destinationDataset =
+ new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG,
hiveMetastoreURI, destinationTable);
+ destinationDataset.addMetadata(DatasetConstants.FS_URI,
this.getFs().getUri().toString());
Review Comment:
yeah... these things are also unclear for me. Once we test it end to end is
when I feel we will get more clarity.
Issue Time Tracking
-------------------
Worklog Id: (was: 809167)
Time Spent: 2.5h (was: 2h 20m)
> Create work units for Hive Catalog based Iceberg Datasets to support Distcp
> for Iceberg
> ---------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1709
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1709
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: distcp-ng
> Reporter: Meeth Gala
> Assignee: Issac Buenrostro
> Priority: Major
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> We want to support Distcp for Iceberg based datasets.
> As a pilot, we are starting with Hive Catalog and will expand the
> functionality to cover all Iceberg based datasets.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)