[
https://issues.apache.org/jira/browse/GOBBLIN-1709?focusedWorklogId=808446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-808446
]
ASF GitHub Bot logged work on GOBBLIN-1709:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Sep/22 21:04
Start Date: 13/Sep/22 21:04
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3560:
URL: https://github.com/apache/gobblin/pull/3560#discussion_r969935673
##########
gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java:
##########
@@ -386,4 +386,4 @@ public Set<Characteristic> applicableCharacteristics() {
}
}
-}
+}
Review Comment:
typically we have add a trailing newline. overall though, this file may
have snuck in unintentionally... (?)
##########
gobblin-runtime/src/main/resources/templates/icebergDistcp.template:
##########
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# ====================================================================
+# Job configurations (can be changed)
+# ====================================================================
+
+# General job metadata
+job.group=DistcpIceberg
+job.name=EmbeddedDistcpIcebergTracking
+job.description=Embedded Iceberg-Distcp-ng toy job
+
Review Comment:
unrelatedly you added a unit test for `IcebergDataset`, and that's looking
good.
...have you been able to verify that this config actually works? if not, we
might reserve it for a follow-on commit later.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableFileSet.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+
+/**
+ * A {@link IcebergFileSet} that generates {@link CopyEntity}s for a Hive
Catalog based Iceberg table
+ */
+public class IcebergTableFileSet extends IcebergFileSet{
Review Comment:
nit: missing a space before '{'
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
+
+ public static final String ICEBERG_DATASET_PREFIX = "iceberg.dataset";
+ public static final String ICEBERG_METASTORE_URI_KEY =
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+ public static final String ICEBERG_DB_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".database.name";
+ public static final String ICEBERG_TABLE_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".table.name";
Review Comment:
I expected `ICEBERG_DATASET_PREFIX` rather than
`DatasetConstants.PLATFORM_ICEBERG`
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+ private final String dbName;
+ private final String inputTableName;
+ private IcebergTable icebergTable;
+ protected Properties properties;
+ protected FileSystem fs;
+
+ private Optional<String> sourceMetastoreURI;
+ private Optional<String> targetMetastoreURI;
+
+ /** Target metastore URI */
+ public static final String TARGET_METASTORE_URI_KEY =
+ IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".copy.target.metastore.uri";
+ /** Target database name */
+ public static final String TARGET_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+ public IcebergDataset(String db, String table, IcebergTable icebergTbl,
Properties properties, FileSystem fs) {
+ this.dbName = db;
+ this.inputTableName = table;
+ this.icebergTable = icebergTbl;
+ this.properties = properties;
+ this.fs = fs;
+ this.sourceMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_METASTORE_URI_KEY));
+ this.targetMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+ }
+
+ public IcebergDataset(String db, String table) {
+ this.dbName = db;
+ this.inputTableName = table;
+ }
Review Comment:
seems potentially problematic--what's this for? won't it result in NPEs
when the core methods of the class are invoked?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+ private final String dbName;
+ private final String inputTableName;
+ private IcebergTable icebergTable;
+ protected Properties properties;
+ protected FileSystem fs;
+
+ private Optional<String> sourceMetastoreURI;
+ private Optional<String> targetMetastoreURI;
+
+ /** Target metastore URI */
+ public static final String TARGET_METASTORE_URI_KEY =
+ IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".copy.target.metastore.uri";
+ /** Target database name */
+ public static final String TARGET_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+ public IcebergDataset(String db, String table, IcebergTable icebergTbl,
Properties properties, FileSystem fs) {
+ this.dbName = db;
+ this.inputTableName = table;
+ this.icebergTable = icebergTbl;
+ this.properties = properties;
+ this.fs = fs;
+ this.sourceMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_METASTORE_URI_KEY));
+ this.targetMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+ }
+
+ public IcebergDataset(String db, String table) {
+ this.dbName = db;
+ this.inputTableName = table;
+ }
+
+ /**
+ * Represents a source {@link FileStatus} and a {@link Path} destination.
+ */
+ @Data
+ private static class SourceAndDestination {
+ private final FileStatus source;
+ private final Path destination;
+ }
+
+ @Override
+ public String datasetURN() {
+ return this.dbName + "." + this.inputTableName;
+ }
+
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration) {
+ return getCopyEntities(configuration);
+ }
+ /**
+ * Finds all files read by the table and generates CopyableFiles.
+ * For the specific semantics see {@link #getCopyEntities}.
+ */
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration,
+ Comparator<FileSet<CopyEntity>> prioritizer,
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+ return getCopyEntities(configuration);
+ }
+
+ /**
+ * Finds all files read by the table and generates {@link CopyEntity}s for
duplicating the table.
+ */
+ Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration
configuration) {
+ FileSet<CopyEntity> fileSet = new
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+ return Iterators.singletonIterator(fileSet); }
+
+ /**
+ * Finds all files read by the table file set and generates {@link
CopyEntity}s for duplicating the table.
+ */
+ @VisibleForTesting
+ Collection<CopyEntity> generateCopyEntitiesForTableFileSet(CopyConfiguration
configuration) throws IOException {
+ String fileSet = this.getInputTableName();
+ List<CopyEntity> copyEntities = Lists.newArrayList();
+ log.info("Fetching all the files to be copied");
+ Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+
+ log.info("Fetching copyable file builders from their respective file sets
and adding to collection of copy entities");
+ for (CopyableFile.Builder builder :
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+ CopyableFile fileEntity =
+
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+ fileEntity.setSourceData(getSourceDataset());
+ fileEntity.setDestinationData(getDestinationDataset());
+ copyEntities.add(fileEntity);
+ }
+ return copyEntities;
+ }
+
+ /**
+ * Get builders for a {@link CopyableFile} for each file referred to by a
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.
+ */
+ List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, FileStatus>
paths, CopyConfiguration configuration) throws IOException {
+
+ List<CopyableFile.Builder> builders = Lists.newArrayList();
+ List<SourceAndDestination> dataFiles = Lists.newArrayList();
+ Configuration hadoopConfiguration = new Configuration();
+ FileSystem actualSourceFs;
+
+ for(Map.Entry<Path, FileStatus> entry : paths.entrySet()) {
+ dataFiles.add(new SourceAndDestination(entry.getValue(),
this.fs.makeQualified(entry.getKey())));
+ }
+
+ for(SourceAndDestination sourceAndDestination : dataFiles) {
+ actualSourceFs =
sourceAndDestination.getSource().getPath().getFileSystem(hadoopConfiguration);
+
+ // TODO Add ancestor owner and permissions in future releases
+ builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs,
sourceAndDestination.getSource(),
+ sourceAndDestination.getDestination(), configuration));
+ }
+ return builders;
+ }
+ /**
+ * Finds all files read by the Iceberg table including metadata json file,
manifest files, nested manifest file paths and actual data files.
+ * Returns a map of path, file status for each file that needs to be copied
+ */
+ Map<Path, FileStatus> getFilePaths() throws IOException {
Review Comment:
why should this be package protected? is it `@VisibleForTesting`?
shouldn't it be `private` or `protected`--depending on how we envision
subclassing?
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.TableOperations;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+
+
+public class IcebergDatasetTest {
+
+ static final String METADATA_PATH = "/root/iceberg/test/metadata";
+ static final String MANIFEST_PATH =
"/root/iceberg/test/metadata/test_manifest";
+ static final String MANIFEST_FILE_PATH1 =
"/root/iceberg/test/metadata/test_manifest/data/a";
+ static final String MANIFEST_FILE_PATH2 =
"/root/iceberg/test/metadata/test_manifest/data/b";
+
+ @Test
+ public void testGetFilePaths() throws IOException {
+
+ List<String> pathsToCopy = new ArrayList<>();
+ pathsToCopy.add(MANIFEST_FILE_PATH1);
+ pathsToCopy.add(MANIFEST_FILE_PATH2);
+ Map<Path, FileStatus> expected = Maps.newHashMap();
+ expected.put(new Path(MANIFEST_FILE_PATH1), null);
+ expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+ IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ IcebergSnapshotInfo icebergSnapshotInfo =
Mockito.mock(IcebergSnapshotInfo.class);
+
+
Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+ Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+ IcebergDataset icebergDataset = new IcebergDataset("test_db_name",
"test_tbl_name", icebergTable, new Properties(), fs);
+
+ Map<Path, FileStatus> actual = icebergDataset.getFilePaths();
+ Assert.assertEquals(actual, expected);
+ }
+
+ /**
+ * Test case to copy all the file paths for a mocked iceberg table. This is
a full copy overwriting everything on the destination
+ */
+ @Test
+ public void testGenerateCopyEntitiesForTableFileSet() throws IOException,
URISyntaxException {
+
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ String test_db_name = "test_db_name";
+ String test_table_name = "test_tbl_name";
+ Set<String> setOfFilePaths = new HashSet<>(Arrays.asList(METADATA_PATH,
MANIFEST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
+
+ Properties properties = new Properties();
+ properties.setProperty("data.publisher.final.dir", "/test");
+
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(null,
properties)
+ .preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext())
+ .build();
+ TableOperations tableOperations = Mockito.mock(TableOperations.class);
+
+ IcebergTable icebergTable = new MockedIcebergTable(tableOperations);
+ IcebergDataset icebergDataset = new IcebergDataset(test_db_name,
test_table_name, icebergTable, new Properties(), fs);
+
+ FileStatus fileStatus1 = new FileStatus();
+ fileStatus1.setPath(new Path(METADATA_PATH));
+ FileStatus fileStatus2 = new FileStatus();
+ fileStatus2.setPath(new Path(MANIFEST_PATH));
+ FileStatus fileStatus3 = new FileStatus();
+ fileStatus3.setPath(new Path(MANIFEST_FILE_PATH1));
+ FileStatus fileStatus4 = new FileStatus();
+ fileStatus4.setPath(new Path(MANIFEST_FILE_PATH2));
+
+ Path path1 = new Path(METADATA_PATH);
+ Path path2 = new Path(MANIFEST_PATH);
+ Path path3 = new Path(MANIFEST_FILE_PATH1);
+ Path path4 = new Path(MANIFEST_FILE_PATH2);
+
+
+ Mockito.when(fs.makeQualified(any(Path.class))).thenReturn(new
Path("/root/iceberg/test/destination/sub_path_destination"));
+ Mockito.when(fs.getFileStatus(path1)).thenReturn(fileStatus1);
+ Mockito.when(fs.getFileStatus(path2)).thenReturn(fileStatus2);
+ Mockito.when(fs.getFileStatus(path3)).thenReturn(fileStatus3);
+ Mockito.when(fs.getFileStatus(path4)).thenReturn(fileStatus4);
+ Mockito.when(fs.getUri()).thenReturn(new URI(null, null,
"/root/iceberg/test/output", null));
Review Comment:
this mocking of the `FileSystem` also seems worth abstracting into a helper
method
...and, indentation looks off on last line
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.TableOperations;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+
+
+public class IcebergDatasetTest {
+
+ static final String METADATA_PATH = "/root/iceberg/test/metadata";
+ static final String MANIFEST_PATH =
"/root/iceberg/test/metadata/test_manifest";
+ static final String MANIFEST_FILE_PATH1 =
"/root/iceberg/test/metadata/test_manifest/data/a";
+ static final String MANIFEST_FILE_PATH2 =
"/root/iceberg/test/metadata/test_manifest/data/b";
+
+ @Test
+ public void testGetFilePaths() throws IOException {
+
+ List<String> pathsToCopy = new ArrayList<>();
+ pathsToCopy.add(MANIFEST_FILE_PATH1);
+ pathsToCopy.add(MANIFEST_FILE_PATH2);
+ Map<Path, FileStatus> expected = Maps.newHashMap();
+ expected.put(new Path(MANIFEST_FILE_PATH1), null);
+ expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+ IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ IcebergSnapshotInfo icebergSnapshotInfo =
Mockito.mock(IcebergSnapshotInfo.class);
+
+
Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+ Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+ IcebergDataset icebergDataset = new IcebergDataset("test_db_name",
"test_tbl_name", icebergTable, new Properties(), fs);
+
+ Map<Path, FileStatus> actual = icebergDataset.getFilePaths();
+ Assert.assertEquals(actual, expected);
+ }
+
+ /**
+ * Test case to copy all the file paths for a mocked iceberg table. This is
a full copy overwriting everything on the destination
Review Comment:
we're not actually copying files, right? ... just calculating the
`CopyEntity` collection
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileSet.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import lombok.Getter;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.partition.FileSet;
+
+/**
+ * A {@link FileSet} for Iceberg datasets. Contains information on Iceberg
Dataset associated with an Iceberg table.
+ */
+@Getter
+public abstract class IcebergFileSet extends FileSet<CopyEntity> {
Review Comment:
I'm uncertain we actually need this thin abstract fileset with iceberg, in
the analogous way we have for hive. the latter has both partitioned and
unpartitioned hive file sets, so that gives a common base. at first blush,
that's not on the iceberg side.
what would you say to combining them so only one iceberg fileset class?
perhaps keep the name `IcebergTableFileSet`, just in case we later realize
there is in fact a need for `IcebergFileSet`. if so, we wouldn't then need to
rename the class already in use and referenced downstream.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
+
+ public static final String ICEBERG_DATASET_PREFIX = "iceberg.dataset";
+ public static final String ICEBERG_METASTORE_URI_KEY =
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+ public static final String ICEBERG_DB_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".database.name";
+ public static final String ICEBERG_TABLE_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".table.name";
+
+ private String dbName;
+ private String tblName;
+ private final Properties properties;
+ protected final FileSystem fs;
+
+ @Override
+ public List<IcebergDataset> findDatasets() throws IOException {
+ List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ /*
+ * Both Iceberg database name and table name are mandatory,
+ * since we are currently only supporting Hive Catalog based Iceberg
tables.
Review Comment:
there's no fundamental incompatibility (at least IIRC it's possible to scan
an iceberg `HiveCatalog`). seems more more for simplicity, that, to begin
with, we copy just one iceberg at a time.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
+
+ public static final String ICEBERG_DATASET_PREFIX = "iceberg.dataset";
+ public static final String ICEBERG_METASTORE_URI_KEY =
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+ public static final String ICEBERG_DB_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".database.name";
+ public static final String ICEBERG_TABLE_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".table.name";
+
+ private String dbName;
+ private String tblName;
+ private final Properties properties;
+ protected final FileSystem fs;
+
+ @Override
+ public List<IcebergDataset> findDatasets() throws IOException {
+ List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ /*
+ * Both Iceberg database name and table name are mandatory,
+ * since we are currently only supporting Hive Catalog based Iceberg
tables.
+ * The design will support defaults and other catalogs in future releases.
+ */
+ if (properties.getProperty(ICEBERG_DB_NAME) == null ||
properties.getProperty(ICEBERG_TABLE_NAME) == null) {
Review Comment:
more common pattern is to test
`StringUtils.isNotBlank(properties.getProperty(x))`
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
+
+ public static final String ICEBERG_DATASET_PREFIX = "iceberg.dataset";
+ public static final String ICEBERG_METASTORE_URI_KEY =
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
Review Comment:
nit: suggest: `ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY`
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
+
+ public static final String ICEBERG_DATASET_PREFIX = "iceberg.dataset";
+ public static final String ICEBERG_METASTORE_URI_KEY =
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+ public static final String ICEBERG_DB_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".database.name";
+ public static final String ICEBERG_TABLE_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".table.name";
+
+ private String dbName;
+ private String tblName;
+ private final Properties properties;
+ protected final FileSystem fs;
+
+ @Override
+ public List<IcebergDataset> findDatasets() throws IOException {
+ List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ /*
+ * Both Iceberg database name and table name are mandatory,
+ * since we are currently only supporting Hive Catalog based Iceberg
tables.
+ * The design will support defaults and other catalogs in future releases.
+ */
+ if (properties.getProperty(ICEBERG_DB_NAME) == null ||
properties.getProperty(ICEBERG_TABLE_NAME) == null) {
+ throw new IOException("Iceberg database name or Iceberg table name is
missing");
+ }
+ this.dbName = properties.getProperty(ICEBERG_DB_NAME);
+ this.tblName = properties.getProperty(ICEBERG_TABLE_NAME);
+
+ Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
+
+ IcebergCatalog icebergCatalog =
IcebergCatalogFactory.create(configuration);
+ IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
+ // Currently, we only support one dataset per iceberg table
+ matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergTable,
properties, fs));
+ log.info("Found {} matching datasets: {}", matchingDatasets.size(),
matchingDatasets);
+
+ return matchingDatasets;
+ }
+
+ @Override
+ public Path commonDatasetRoot() {
Review Comment:
what are semantics of `null`? `HiveDatasetFinder` itself has:
```
@Override
public Path commonDatasetRoot() {
return new Path("/");
}
```
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
+
+ public static final String ICEBERG_DATASET_PREFIX = "iceberg.dataset";
+ public static final String ICEBERG_METASTORE_URI_KEY =
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+ public static final String ICEBERG_DB_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".database.name";
+ public static final String ICEBERG_TABLE_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".table.name";
+
+ private String dbName;
+ private String tblName;
+ private final Properties properties;
+ protected final FileSystem fs;
+
+ @Override
+ public List<IcebergDataset> findDatasets() throws IOException {
+ List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ /*
+ * Both Iceberg database name and table name are mandatory,
+ * since we are currently only supporting Hive Catalog based Iceberg
tables.
+ * The design will support defaults and other catalogs in future releases.
+ */
+ if (properties.getProperty(ICEBERG_DB_NAME) == null ||
properties.getProperty(ICEBERG_TABLE_NAME) == null) {
+ throw new IOException("Iceberg database name or Iceberg table name is
missing");
Review Comment:
doesn't really feel like an `IOException`.
somewhat a stretch (since it's a missing arg), but could be
`IllegalArgumentException`. otherwise, just fall back to using
`RuntimeException`
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
+
+ public static final String ICEBERG_DATASET_PREFIX = "iceberg.dataset";
+ public static final String ICEBERG_METASTORE_URI_KEY =
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+ public static final String ICEBERG_DB_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".database.name";
+ public static final String ICEBERG_TABLE_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".table.name";
+
+ private String dbName;
+ private String tblName;
+ private final Properties properties;
+ protected final FileSystem fs;
+
+ @Override
+ public List<IcebergDataset> findDatasets() throws IOException {
+ List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ /*
+ * Both Iceberg database name and table name are mandatory,
+ * since we are currently only supporting Hive Catalog based Iceberg
tables.
+ * The design will support defaults and other catalogs in future releases.
+ */
+ if (properties.getProperty(ICEBERG_DB_NAME) == null ||
properties.getProperty(ICEBERG_TABLE_NAME) == null) {
+ throw new IOException("Iceberg database name or Iceberg table name is
missing");
+ }
+ this.dbName = properties.getProperty(ICEBERG_DB_NAME);
+ this.tblName = properties.getProperty(ICEBERG_TABLE_NAME);
+
+ Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
+
+ IcebergCatalog icebergCatalog =
IcebergCatalogFactory.create(configuration);
+ IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
+ // Currently, we only support one dataset per iceberg table
+ matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergTable,
properties, fs));
Review Comment:
perhaps a comment to note that error handling / verification of table
existence comes later in `IcebergTable.getCurrentSnapshotInfo`?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+ private final String dbName;
+ private final String inputTableName;
+ private IcebergTable icebergTable;
+ protected Properties properties;
+ protected FileSystem fs;
+
+ private Optional<String> sourceMetastoreURI;
+ private Optional<String> targetMetastoreURI;
+
+ /** Target metastore URI */
+ public static final String TARGET_METASTORE_URI_KEY =
+ IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".copy.target.metastore.uri";
+ /** Target database name */
+ public static final String TARGET_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+ public IcebergDataset(String db, String table, IcebergTable icebergTbl,
Properties properties, FileSystem fs) {
+ this.dbName = db;
+ this.inputTableName = table;
+ this.icebergTable = icebergTbl;
+ this.properties = properties;
+ this.fs = fs;
+ this.sourceMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_METASTORE_URI_KEY));
+ this.targetMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+ }
+
+ public IcebergDataset(String db, String table) {
+ this.dbName = db;
+ this.inputTableName = table;
+ }
+
+ /**
+ * Represents a source {@link FileStatus} and a {@link Path} destination.
+ */
+ @Data
+ private static class SourceAndDestination {
+ private final FileStatus source;
+ private final Path destination;
+ }
+
+ @Override
+ public String datasetURN() {
+ return this.dbName + "." + this.inputTableName;
Review Comment:
doesn't look like a URN, at least not FQ. will this work? if not, maybe
sprinkle in a `// TODO: verify!`
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+ private final String dbName;
+ private final String inputTableName;
+ private IcebergTable icebergTable;
+ protected Properties properties;
+ protected FileSystem fs;
+
+ private Optional<String> sourceMetastoreURI;
+ private Optional<String> targetMetastoreURI;
+
+ /** Target metastore URI */
+ public static final String TARGET_METASTORE_URI_KEY =
+ IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".copy.target.metastore.uri";
+ /** Target database name */
+ public static final String TARGET_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+ public IcebergDataset(String db, String table, IcebergTable icebergTbl,
Properties properties, FileSystem fs) {
+ this.dbName = db;
+ this.inputTableName = table;
+ this.icebergTable = icebergTbl;
+ this.properties = properties;
+ this.fs = fs;
+ this.sourceMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_METASTORE_URI_KEY));
+ this.targetMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+ }
+
+ public IcebergDataset(String db, String table) {
+ this.dbName = db;
+ this.inputTableName = table;
+ }
+
+ /**
+ * Represents a source {@link FileStatus} and a {@link Path} destination.
+ */
+ @Data
+ private static class SourceAndDestination {
+ private final FileStatus source;
+ private final Path destination;
+ }
+
+ @Override
+ public String datasetURN() {
+ return this.dbName + "." + this.inputTableName;
+ }
+
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration) {
+ return getCopyEntities(configuration);
+ }
+ /**
+ * Finds all files read by the table and generates CopyableFiles.
+ * For the specific semantics see {@link #getCopyEntities}.
+ */
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration,
+ Comparator<FileSet<CopyEntity>> prioritizer,
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+ return getCopyEntities(configuration);
Review Comment:
clearly not implemented, since we don't even touch several input params.
please add a todo comment acknowledging incompleteness
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+ private final String dbName;
+ private final String inputTableName;
+ private IcebergTable icebergTable;
+ protected Properties properties;
+ protected FileSystem fs;
+
+ private Optional<String> sourceMetastoreURI;
+ private Optional<String> targetMetastoreURI;
+
+ /** Target metastore URI */
+ public static final String TARGET_METASTORE_URI_KEY =
+ IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".copy.target.metastore.uri";
+ /** Target database name */
+ public static final String TARGET_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+ public IcebergDataset(String db, String table, IcebergTable icebergTbl,
Properties properties, FileSystem fs) {
+ this.dbName = db;
+ this.inputTableName = table;
+ this.icebergTable = icebergTbl;
+ this.properties = properties;
+ this.fs = fs;
+ this.sourceMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_METASTORE_URI_KEY));
+ this.targetMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+ }
+
+ public IcebergDataset(String db, String table) {
+ this.dbName = db;
+ this.inputTableName = table;
+ }
+
+ /**
+ * Represents a source {@link FileStatus} and a {@link Path} destination.
+ */
+ @Data
+ private static class SourceAndDestination {
+ private final FileStatus source;
+ private final Path destination;
+ }
+
+ @Override
+ public String datasetURN() {
+ return this.dbName + "." + this.inputTableName;
+ }
+
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration) {
+ return getCopyEntities(configuration);
+ }
+ /**
+ * Finds all files read by the table and generates CopyableFiles.
+ * For the specific semantics see {@link #getCopyEntities}.
+ */
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration,
+ Comparator<FileSet<CopyEntity>> prioritizer,
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+ return getCopyEntities(configuration);
+ }
+
+ /**
+ * Finds all files read by the table and generates {@link CopyEntity}s for
duplicating the table.
+ */
+ Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration
configuration) {
+ FileSet<CopyEntity> fileSet = new
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+ return Iterators.singletonIterator(fileSet); }
+
+ /**
+ * Finds all files read by the table file set and generates {@link
CopyEntity}s for duplicating the table.
+ */
+ @VisibleForTesting
+ Collection<CopyEntity> generateCopyEntitiesForTableFileSet(CopyConfiguration
configuration) throws IOException {
+ String fileSet = this.getInputTableName();
+ List<CopyEntity> copyEntities = Lists.newArrayList();
+ log.info("Fetching all the files to be copied");
+ Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+
+ log.info("Fetching copyable file builders from their respective file sets
and adding to collection of copy entities");
+ for (CopyableFile.Builder builder :
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+ CopyableFile fileEntity =
+
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+ fileEntity.setSourceData(getSourceDataset());
+ fileEntity.setDestinationData(getDestinationDataset());
+ copyEntities.add(fileEntity);
+ }
+ return copyEntities;
+ }
+
+ /**
+ * Get builders for a {@link CopyableFile} for each file referred to by a
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.
+ */
+ List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, FileStatus>
paths, CopyConfiguration configuration) throws IOException {
+
+ List<CopyableFile.Builder> builders = Lists.newArrayList();
+ List<SourceAndDestination> dataFiles = Lists.newArrayList();
+ Configuration hadoopConfiguration = new Configuration();
+ FileSystem actualSourceFs;
+
+ for(Map.Entry<Path, FileStatus> entry : paths.entrySet()) {
+ dataFiles.add(new SourceAndDestination(entry.getValue(),
this.fs.makeQualified(entry.getKey())));
+ }
+
+ for(SourceAndDestination sourceAndDestination : dataFiles) {
+ actualSourceFs =
sourceAndDestination.getSource().getPath().getFileSystem(hadoopConfiguration);
+
+ // TODO Add ancestor owner and permissions in future releases
+ builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs,
sourceAndDestination.getSource(),
+ sourceAndDestination.getDestination(), configuration));
+ }
+ return builders;
+ }
+ /**
+ * Finds all files read by the Iceberg table including metadata json file,
manifest files, nested manifest file paths and actual data files.
+ * Returns a map of path, file status for each file that needs to be copied
+ */
+ Map<Path, FileStatus> getFilePaths() throws IOException {
+ Map<Path, FileStatus> result = Maps.newHashMap();
+ IcebergTable icebergTable = this.getIcebergTable();
+ IcebergSnapshotInfo icebergSnapshotInfo =
icebergTable.getCurrentSnapshotInfo();
+
+ log.info("Fetching all file paths for the current snapshot of the Iceberg
table");
+ List<String> pathsToCopy = icebergSnapshotInfo.getAllPaths();
+
+ for(String pathString : pathsToCopy) {
+ Path path = new Path(pathString);
+ result.put(path, this.fs.getFileStatus(path));
+ }
+ return result;
+ }
+
+ DatasetDescriptor getSourceDataset() {
+ return getDatasetDescriptor(sourceMetastoreURI);
+ }
+
+ DatasetDescriptor getDestinationDataset() {
+ return getDatasetDescriptor(targetMetastoreURI);
+ }
+
+ @NotNull
+ private DatasetDescriptor getDatasetDescriptor(Optional<String>
stringMetastoreURI) {
+ String destinationTable = this.getDbName() + "." +
this.getInputTableName();
+
+ URI hiveMetastoreURI = null;
+ if (stringMetastoreURI.isPresent()) {
+ hiveMetastoreURI = URI.create(stringMetastoreURI.get());
+ }
+
+ DatasetDescriptor destinationDataset =
+ new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG,
hiveMetastoreURI, destinationTable);
+ destinationDataset.addMetadata(DatasetConstants.FS_URI,
this.getFs().getUri().toString());
Review Comment:
likely to require tweaking eventually, esp. since not all iceberg catalogs
are hive metastores. I'm also unclear what it means in cases where the HMS URI
is null...
nonetheless, may be fine in the very near term for this particular commit
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.TableOperations;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+
+
+public class IcebergDatasetTest {
+
+ static final String METADATA_PATH = "/root/iceberg/test/metadata";
+ static final String MANIFEST_PATH =
"/root/iceberg/test/metadata/test_manifest";
+ static final String MANIFEST_FILE_PATH1 =
"/root/iceberg/test/metadata/test_manifest/data/a";
+ static final String MANIFEST_FILE_PATH2 =
"/root/iceberg/test/metadata/test_manifest/data/b";
+
+ @Test
+ public void testGetFilePaths() throws IOException {
+
+ List<String> pathsToCopy = new ArrayList<>();
+ pathsToCopy.add(MANIFEST_FILE_PATH1);
+ pathsToCopy.add(MANIFEST_FILE_PATH2);
+ Map<Path, FileStatus> expected = Maps.newHashMap();
+ expected.put(new Path(MANIFEST_FILE_PATH1), null);
+ expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+ IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ IcebergSnapshotInfo icebergSnapshotInfo =
Mockito.mock(IcebergSnapshotInfo.class);
Review Comment:
would be nice to abstract all of this to a helper:
```
IcebergTable mockIcebergTable(pathsToCopy, whateverElse...)
```
we'll likely need more than a single test, and that will help to write each
n+1
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
+
+ public static final String ICEBERG_DATASET_PREFIX = "iceberg.dataset";
+ public static final String ICEBERG_METASTORE_URI_KEY =
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+ public static final String ICEBERG_DB_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".database.name";
+ public static final String ICEBERG_TABLE_NAME =
DatasetConstants.PLATFORM_ICEBERG + ".table.name";
+
+ private String dbName;
+ private String tblName;
+ private final Properties properties;
+ protected final FileSystem fs;
+
+ @Override
+ public List<IcebergDataset> findDatasets() throws IOException {
+ List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ /*
+ * Both Iceberg database name and table name are mandatory,
+ * since we are currently only supporting Hive Catalog based Iceberg
tables.
+ * The design will support defaults and other catalogs in future releases.
+ */
+ if (properties.getProperty(ICEBERG_DB_NAME) == null ||
properties.getProperty(ICEBERG_TABLE_NAME) == null) {
+ throw new IOException("Iceberg database name or Iceberg table name is
missing");
+ }
+ this.dbName = properties.getProperty(ICEBERG_DB_NAME);
+ this.tblName = properties.getProperty(ICEBERG_TABLE_NAME);
+
+ Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
+
+ IcebergCatalog icebergCatalog =
IcebergCatalogFactory.create(configuration);
+ IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
+ // Currently, we only support one dataset per iceberg table
+ matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergTable,
properties, fs));
+ log.info("Found {} matching datasets: {}", matchingDatasets.size(),
matchingDatasets);
+
+ return matchingDatasets;
+ }
+
+ @Override
+ public Path commonDatasetRoot() {
+ return null;
+ }
+
+ @Override
+ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
+ return findDatasets().iterator();
+ }
+ protected IcebergDataset createIcebergDataset(String dbName, String tblName,
IcebergTable icebergTable, Properties properties, FileSystem fs) {
Review Comment:
minor, but you could pass in the `IcebergCatalog` and call `openTable()`
within to gain the `IcebergTable` you'll eventually pass to the
`IcebergDataset` ctor
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.TableOperations;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+
+
+public class IcebergDatasetTest {
+
+ static final String METADATA_PATH = "/root/iceberg/test/metadata";
+ static final String MANIFEST_PATH =
"/root/iceberg/test/metadata/test_manifest";
+ static final String MANIFEST_FILE_PATH1 =
"/root/iceberg/test/metadata/test_manifest/data/a";
+ static final String MANIFEST_FILE_PATH2 =
"/root/iceberg/test/metadata/test_manifest/data/b";
+
+ @Test
+ public void testGetFilePaths() throws IOException {
+
+ List<String> pathsToCopy = new ArrayList<>();
+ pathsToCopy.add(MANIFEST_FILE_PATH1);
+ pathsToCopy.add(MANIFEST_FILE_PATH2);
+ Map<Path, FileStatus> expected = Maps.newHashMap();
+ expected.put(new Path(MANIFEST_FILE_PATH1), null);
+ expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+ IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ IcebergSnapshotInfo icebergSnapshotInfo =
Mockito.mock(IcebergSnapshotInfo.class);
+
+
Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+ Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
Review Comment:
since it's just a POJO, I'd expect you to just return a value, rather than
make a mock... although perhaps not a big difference either way...
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+ private final String dbName;
+ private final String inputTableName;
+ private IcebergTable icebergTable;
+ protected Properties properties;
+ protected FileSystem fs;
+
+ private Optional<String> sourceMetastoreURI;
+ private Optional<String> targetMetastoreURI;
+
+ /** Target metastore URI */
+ public static final String TARGET_METASTORE_URI_KEY =
+ IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
".copy.target.metastore.uri";
+ /** Target database name */
+ public static final String TARGET_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+ public IcebergDataset(String db, String table, IcebergTable icebergTbl,
Properties properties, FileSystem fs) {
+ this.dbName = db;
+ this.inputTableName = table;
+ this.icebergTable = icebergTbl;
+ this.properties = properties;
+ this.fs = fs;
+ this.sourceMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_METASTORE_URI_KEY));
+ this.targetMetastoreURI =
+
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+ }
+
+ public IcebergDataset(String db, String table) {
+ this.dbName = db;
+ this.inputTableName = table;
+ }
+
+ /**
+ * Represents a source {@link FileStatus} and a {@link Path} destination.
+ */
+ @Data
+ private static class SourceAndDestination {
+ private final FileStatus source;
+ private final Path destination;
+ }
+
+ @Override
+ public String datasetURN() {
+ return this.dbName + "." + this.inputTableName;
+ }
+
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration) {
+ return getCopyEntities(configuration);
+ }
+ /**
+ * Finds all files read by the table and generates CopyableFiles.
+ * For the specific semantics see {@link #getCopyEntities}.
+ */
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration,
+ Comparator<FileSet<CopyEntity>> prioritizer,
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+ return getCopyEntities(configuration);
+ }
+
+ /**
+ * Finds all files read by the table and generates {@link CopyEntity}s for
duplicating the table.
+ */
+ Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration
configuration) {
+ FileSet<CopyEntity> fileSet = new
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+ return Iterators.singletonIterator(fileSet); }
+
+ /**
+ * Finds all files read by the table file set and generates {@link
CopyEntity}s for duplicating the table.
+ */
+ @VisibleForTesting
+ Collection<CopyEntity> generateCopyEntitiesForTableFileSet(CopyConfiguration
configuration) throws IOException {
+ String fileSet = this.getInputTableName();
+ List<CopyEntity> copyEntities = Lists.newArrayList();
+ log.info("Fetching all the files to be copied");
+ Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+
+ log.info("Fetching copyable file builders from their respective file sets
and adding to collection of copy entities");
+ for (CopyableFile.Builder builder :
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+ CopyableFile fileEntity =
+
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+ fileEntity.setSourceData(getSourceDataset());
+ fileEntity.setDestinationData(getDestinationDataset());
+ copyEntities.add(fileEntity);
+ }
+ return copyEntities;
+ }
+
+ /**
+ * Get builders for a {@link CopyableFile} for each file referred to by a
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.
+ */
+ List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, FileStatus>
paths, CopyConfiguration configuration) throws IOException {
+
+ List<CopyableFile.Builder> builders = Lists.newArrayList();
+ List<SourceAndDestination> dataFiles = Lists.newArrayList();
+ Configuration hadoopConfiguration = new Configuration();
Review Comment:
unsure, but would this be where we'd point to whatever target cluster we're
supposed to copy to? I believe we'll eventually need more than empty config...
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
+
+ public static final String ICEBERG_DATASET_PREFIX = "iceberg.dataset";
+ public static final String ICEBERG_METASTORE_URI_KEY =
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
Review Comment:
also, I don't see this being used below (although I do expect we'll
eventually need to incorporate it)
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.TableOperations;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+
+
+public class IcebergDatasetTest {
+
+ static final String METADATA_PATH = "/root/iceberg/test/metadata";
+ static final String MANIFEST_PATH =
"/root/iceberg/test/metadata/test_manifest";
+ static final String MANIFEST_FILE_PATH1 =
"/root/iceberg/test/metadata/test_manifest/data/a";
+ static final String MANIFEST_FILE_PATH2 =
"/root/iceberg/test/metadata/test_manifest/data/b";
+
+ @Test
+ public void testGetFilePaths() throws IOException {
+
+ List<String> pathsToCopy = new ArrayList<>();
+ pathsToCopy.add(MANIFEST_FILE_PATH1);
+ pathsToCopy.add(MANIFEST_FILE_PATH2);
+ Map<Path, FileStatus> expected = Maps.newHashMap();
+ expected.put(new Path(MANIFEST_FILE_PATH1), null);
+ expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+ IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ IcebergSnapshotInfo icebergSnapshotInfo =
Mockito.mock(IcebergSnapshotInfo.class);
+
+
Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+ Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+ IcebergDataset icebergDataset = new IcebergDataset("test_db_name",
"test_tbl_name", icebergTable, new Properties(), fs);
+
+ Map<Path, FileStatus> actual = icebergDataset.getFilePaths();
+ Assert.assertEquals(actual, expected);
+ }
+
+ /**
+ * Test case to copy all the file paths for a mocked iceberg table. This is
a full copy overwriting everything on the destination
+ */
+ @Test
+ public void testGenerateCopyEntitiesForTableFileSet() throws IOException,
URISyntaxException {
+
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ String test_db_name = "test_db_name";
+ String test_table_name = "test_tbl_name";
+ Set<String> setOfFilePaths = new HashSet<>(Arrays.asList(METADATA_PATH,
MANIFEST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
+
+ Properties properties = new Properties();
+ properties.setProperty("data.publisher.final.dir", "/test");
+
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(null,
properties)
+ .preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext())
+ .build();
+ TableOperations tableOperations = Mockito.mock(TableOperations.class);
+
+ IcebergTable icebergTable = new MockedIcebergTable(tableOperations);
+ IcebergDataset icebergDataset = new IcebergDataset(test_db_name,
test_table_name, icebergTable, new Properties(), fs);
+
+ FileStatus fileStatus1 = new FileStatus();
+ fileStatus1.setPath(new Path(METADATA_PATH));
+ FileStatus fileStatus2 = new FileStatus();
+ fileStatus2.setPath(new Path(MANIFEST_PATH));
+ FileStatus fileStatus3 = new FileStatus();
+ fileStatus3.setPath(new Path(MANIFEST_FILE_PATH1));
+ FileStatus fileStatus4 = new FileStatus();
+ fileStatus4.setPath(new Path(MANIFEST_FILE_PATH2));
+
+ Path path1 = new Path(METADATA_PATH);
+ Path path2 = new Path(MANIFEST_PATH);
+ Path path3 = new Path(MANIFEST_FILE_PATH1);
+ Path path4 = new Path(MANIFEST_FILE_PATH2);
+
+
+ Mockito.when(fs.makeQualified(any(Path.class))).thenReturn(new
Path("/root/iceberg/test/destination/sub_path_destination"));
+ Mockito.when(fs.getFileStatus(path1)).thenReturn(fileStatus1);
+ Mockito.when(fs.getFileStatus(path2)).thenReturn(fileStatus2);
+ Mockito.when(fs.getFileStatus(path3)).thenReturn(fileStatus3);
+ Mockito.when(fs.getFileStatus(path4)).thenReturn(fileStatus4);
+ Mockito.when(fs.getUri()).thenReturn(new URI(null, null,
"/root/iceberg/test/output", null));
+
+ Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntitiesForTableFileSet(copyConfiguration);
+ Assert.assertEquals(copyEntities.size(), setOfFilePaths.size());
+ for (CopyEntity copyEntity : copyEntities) {
+ String json = copyEntity.toString();
+ JsonObject jsonObject = new Gson().fromJson(json, JsonObject.class);
+ JsonObject objectData =
+
jsonObject.getAsJsonObject("object-data").getAsJsonObject("origin").getAsJsonObject("object-data");
+ JsonObject pathObject =
objectData.getAsJsonObject("path").getAsJsonObject("object-data").getAsJsonObject("uri");
+ String filepath =
pathObject.getAsJsonPrimitive("object-data").getAsString();
+ Assert.assertTrue(setOfFilePaths.contains(filepath));
+ setOfFilePaths.remove(filepath);
+ }
+ Assert.assertTrue(setOfFilePaths.isEmpty());
Review Comment:
again, let's use a helper. we definitely prefer not to repeat ourselves as
we write more tests that similarly wish to validate against expectations.
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.TableOperations;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+
+
+public class IcebergDatasetTest {
+
+ static final String METADATA_PATH = "/root/iceberg/test/metadata";
+ static final String MANIFEST_PATH =
"/root/iceberg/test/metadata/test_manifest";
+ static final String MANIFEST_FILE_PATH1 =
"/root/iceberg/test/metadata/test_manifest/data/a";
+ static final String MANIFEST_FILE_PATH2 =
"/root/iceberg/test/metadata/test_manifest/data/b";
+
+ @Test
+ public void testGetFilePaths() throws IOException {
+
+ List<String> pathsToCopy = new ArrayList<>();
+ pathsToCopy.add(MANIFEST_FILE_PATH1);
+ pathsToCopy.add(MANIFEST_FILE_PATH2);
+ Map<Path, FileStatus> expected = Maps.newHashMap();
+ expected.put(new Path(MANIFEST_FILE_PATH1), null);
+ expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+ IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ IcebergSnapshotInfo icebergSnapshotInfo =
Mockito.mock(IcebergSnapshotInfo.class);
+
+
Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+ Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+ IcebergDataset icebergDataset = new IcebergDataset("test_db_name",
"test_tbl_name", icebergTable, new Properties(), fs);
+
+ Map<Path, FileStatus> actual = icebergDataset.getFilePaths();
+ Assert.assertEquals(actual, expected);
+ }
+
+ /**
+ * Test case to copy all the file paths for a mocked iceberg table. This is
a full copy overwriting everything on the destination
+ */
+ @Test
+ public void testGenerateCopyEntitiesForTableFileSet() throws IOException,
URISyntaxException {
+
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ String test_db_name = "test_db_name";
+ String test_table_name = "test_tbl_name";
+ Set<String> setOfFilePaths = new HashSet<>(Arrays.asList(METADATA_PATH,
MANIFEST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
+
+ Properties properties = new Properties();
+ properties.setProperty("data.publisher.final.dir", "/test");
+
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(null,
properties)
+ .preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext())
+ .build();
+ TableOperations tableOperations = Mockito.mock(TableOperations.class);
+
+ IcebergTable icebergTable = new MockedIcebergTable(tableOperations);
+ IcebergDataset icebergDataset = new IcebergDataset(test_db_name,
test_table_name, icebergTable, new Properties(), fs);
+
+ FileStatus fileStatus1 = new FileStatus();
+ fileStatus1.setPath(new Path(METADATA_PATH));
+ FileStatus fileStatus2 = new FileStatus();
+ fileStatus2.setPath(new Path(MANIFEST_PATH));
+ FileStatus fileStatus3 = new FileStatus();
+ fileStatus3.setPath(new Path(MANIFEST_FILE_PATH1));
+ FileStatus fileStatus4 = new FileStatus();
+ fileStatus4.setPath(new Path(MANIFEST_FILE_PATH2));
+
+ Path path1 = new Path(METADATA_PATH);
+ Path path2 = new Path(MANIFEST_PATH);
+ Path path3 = new Path(MANIFEST_FILE_PATH1);
+ Path path4 = new Path(MANIFEST_FILE_PATH2);
+
+
+ Mockito.when(fs.makeQualified(any(Path.class))).thenReturn(new
Path("/root/iceberg/test/destination/sub_path_destination"));
+ Mockito.when(fs.getFileStatus(path1)).thenReturn(fileStatus1);
+ Mockito.when(fs.getFileStatus(path2)).thenReturn(fileStatus2);
+ Mockito.when(fs.getFileStatus(path3)).thenReturn(fileStatus3);
+ Mockito.when(fs.getFileStatus(path4)).thenReturn(fileStatus4);
+ Mockito.when(fs.getUri()).thenReturn(new URI(null, null,
"/root/iceberg/test/output", null));
+
+ Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntitiesForTableFileSet(copyConfiguration);
+ Assert.assertEquals(copyEntities.size(), setOfFilePaths.size());
+ for (CopyEntity copyEntity : copyEntities) {
+ String json = copyEntity.toString();
+ JsonObject jsonObject = new Gson().fromJson(json, JsonObject.class);
+ JsonObject objectData =
+
jsonObject.getAsJsonObject("object-data").getAsJsonObject("origin").getAsJsonObject("object-data");
+ JsonObject pathObject =
objectData.getAsJsonObject("path").getAsJsonObject("object-data").getAsJsonObject("uri");
+ String filepath =
pathObject.getAsJsonPrimitive("object-data").getAsString();
+ Assert.assertTrue(setOfFilePaths.contains(filepath));
+ setOfFilePaths.remove(filepath);
+ }
+ Assert.assertTrue(setOfFilePaths.isEmpty());
+ }
+
+ private static class MockedIcebergTable extends IcebergTable {
+
+ public MockedIcebergTable(TableOperations tableOps) {
+ super(tableOps);
Review Comment:
no need for a `TableOperations` param: if you find it won't work to just
pass `super(null)`, go ahead and instantiate the mock herein.
instead, let's have params to receive the various paths, so we're able to
write more than this one test with its hard-coded values
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.TableOperations;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+
+
+public class IcebergDatasetTest {
+
+ static final String METADATA_PATH = "/root/iceberg/test/metadata";
+ static final String MANIFEST_PATH =
"/root/iceberg/test/metadata/test_manifest";
+ static final String MANIFEST_FILE_PATH1 =
"/root/iceberg/test/metadata/test_manifest/data/a";
+ static final String MANIFEST_FILE_PATH2 =
"/root/iceberg/test/metadata/test_manifest/data/b";
+
+ @Test
+ public void testGetFilePaths() throws IOException {
+
+ List<String> pathsToCopy = new ArrayList<>();
+ pathsToCopy.add(MANIFEST_FILE_PATH1);
+ pathsToCopy.add(MANIFEST_FILE_PATH2);
+ Map<Path, FileStatus> expected = Maps.newHashMap();
+ expected.put(new Path(MANIFEST_FILE_PATH1), null);
+ expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+ IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ IcebergSnapshotInfo icebergSnapshotInfo =
Mockito.mock(IcebergSnapshotInfo.class);
+
+
Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+ Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+ IcebergDataset icebergDataset = new IcebergDataset("test_db_name",
"test_tbl_name", icebergTable, new Properties(), fs);
+
+ Map<Path, FileStatus> actual = icebergDataset.getFilePaths();
+ Assert.assertEquals(actual, expected);
+ }
+
+ /**
+ * Test case to copy all the file paths for a mocked iceberg table. This is
a full copy overwriting everything on the destination
+ */
+ @Test
+ public void testGenerateCopyEntitiesForTableFileSet() throws IOException,
URISyntaxException {
+
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ String test_db_name = "test_db_name";
+ String test_table_name = "test_tbl_name";
+ Set<String> setOfFilePaths = new HashSet<>(Arrays.asList(METADATA_PATH,
MANIFEST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
+
+ Properties properties = new Properties();
+ properties.setProperty("data.publisher.final.dir", "/test");
+
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(null,
properties)
+ .preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext())
+ .build();
+ TableOperations tableOperations = Mockito.mock(TableOperations.class);
+
+ IcebergTable icebergTable = new MockedIcebergTable(tableOperations);
+ IcebergDataset icebergDataset = new IcebergDataset(test_db_name,
test_table_name, icebergTable, new Properties(), fs);
+
+ FileStatus fileStatus1 = new FileStatus();
+ fileStatus1.setPath(new Path(METADATA_PATH));
+ FileStatus fileStatus2 = new FileStatus();
+ fileStatus2.setPath(new Path(MANIFEST_PATH));
+ FileStatus fileStatus3 = new FileStatus();
+ fileStatus3.setPath(new Path(MANIFEST_FILE_PATH1));
+ FileStatus fileStatus4 = new FileStatus();
+ fileStatus4.setPath(new Path(MANIFEST_FILE_PATH2));
+
+ Path path1 = new Path(METADATA_PATH);
+ Path path2 = new Path(MANIFEST_PATH);
+ Path path3 = new Path(MANIFEST_FILE_PATH1);
+ Path path4 = new Path(MANIFEST_FILE_PATH2);
+
+
+ Mockito.when(fs.makeQualified(any(Path.class))).thenReturn(new
Path("/root/iceberg/test/destination/sub_path_destination"));
+ Mockito.when(fs.getFileStatus(path1)).thenReturn(fileStatus1);
+ Mockito.when(fs.getFileStatus(path2)).thenReturn(fileStatus2);
+ Mockito.when(fs.getFileStatus(path3)).thenReturn(fileStatus3);
+ Mockito.when(fs.getFileStatus(path4)).thenReturn(fileStatus4);
+ Mockito.when(fs.getUri()).thenReturn(new URI(null, null,
"/root/iceberg/test/output", null));
+
+ Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntitiesForTableFileSet(copyConfiguration);
+ Assert.assertEquals(copyEntities.size(), setOfFilePaths.size());
+ for (CopyEntity copyEntity : copyEntities) {
+ String json = copyEntity.toString();
+ JsonObject jsonObject = new Gson().fromJson(json, JsonObject.class);
+ JsonObject objectData =
+
jsonObject.getAsJsonObject("object-data").getAsJsonObject("origin").getAsJsonObject("object-data");
+ JsonObject pathObject =
objectData.getAsJsonObject("path").getAsJsonObject("object-data").getAsJsonObject("uri");
+ String filepath =
pathObject.getAsJsonPrimitive("object-data").getAsString();
+ Assert.assertTrue(setOfFilePaths.contains(filepath));
+ setOfFilePaths.remove(filepath);
Review Comment:
would this help - https://stackoverflow.com/a/47064441 ?
Issue Time Tracking
-------------------
Worklog Id: (was: 808446)
Time Spent: 0.5h (was: 20m)
> Create work units for Hive Catalog based Iceberg Datasets to support Distcp
> for Iceberg
> ---------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1709
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1709
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: distcp-ng
> Reporter: Meeth Gala
> Assignee: Issac Buenrostro
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> We want to support Distcp for Iceberg based datasets.
> As a pilot, we are starting with Hive Catalog and will expand the
> functionality to cover all Iceberg based datasets.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)