[
https://issues.apache.org/jira/browse/GOBBLIN-1709?focusedWorklogId=809686&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-809686
]
ASF GitHub Bot logged work on GOBBLIN-1709:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 16/Sep/22 22:48
Start Date: 16/Sep/22 22:48
Worklog Time Spent: 10m
Work Description: meethngala commented on code in PR #3560:
URL: https://github.com/apache/gobblin/pull/3560#discussion_r973492564
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+
+
+public class
+IcebergDatasetTest {
+
+ static final String METADATA_PATH = "/root/iceberg/test/metadata";
+ static final String MANIFEST_PATH =
"/root/iceberg/test/metadata/test_manifest";
+ static final String MANIFEST_LIST_PATH =
"/root/iceberg/test/metadata/test_manifest/data";
+ static final String MANIFEST_FILE_PATH1 =
"/root/iceberg/test/metadata/test_manifest/data/a";
+ static final String MANIFEST_FILE_PATH2 =
"/root/iceberg/test/metadata/test_manifest/data/b";
+
+ @Test
+ public void testGetFilePaths() throws IOException {
+
+ List<String> pathsToCopy = new ArrayList<>();
+ pathsToCopy.add(MANIFEST_FILE_PATH1);
+ pathsToCopy.add(MANIFEST_FILE_PATH2);
+ Map<Path, FileStatus> expected = Maps.newHashMap();
+ expected.put(new Path(MANIFEST_FILE_PATH1), null);
+ expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+ IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ IcebergSnapshotInfo icebergSnapshotInfo =
Mockito.mock(IcebergSnapshotInfo.class);
+
+
Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+ Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+ IcebergDataset icebergDataset = new IcebergDataset("test_db_name",
"test_tbl_name", icebergTable, new Properties(), fs);
+
+ Map<Path, FileStatus> actual = icebergDataset.getFilePaths();
+ Assert.assertEquals(actual, expected);
+ }
+
+ /**
+ * Test case to generate copy entities for all the file paths for a mocked
iceberg table.
+ * The assumption here is that we create copy entities for all the matching
file paths,
+ * without calculating any difference between the source and destination
+ */
+ @Test
+ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException,
URISyntaxException {
+
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ String test_db_name = "test_db_name";
+ String test_table_name = "test_tbl_name";
+ String test_qualified_path =
"/root/iceberg/test/destination/sub_path_destination";
+ String test_uri_path = "/root/iceberg/test/output";
+ Properties properties = new Properties();
+ properties.setProperty("data.publisher.final.dir", "/test");
+ List<String> expected = new ArrayList<>(Arrays.asList(METADATA_PATH,
MANIFEST_PATH, MANIFEST_LIST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
+
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(null,
properties)
+ .preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext())
+ .build();
+
+ List<String> listedManifestFilePaths = Arrays.asList(MANIFEST_FILE_PATH1,
MANIFEST_FILE_PATH2);
+ IcebergSnapshotInfo.ManifestFileInfo manifestFileInfo = new
IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_LIST_PATH,
listedManifestFilePaths);
+ List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles =
Arrays.asList(manifestFileInfo);
+ IcebergTable icebergTable = new MockedIcebergTable(METADATA_PATH,
MANIFEST_PATH, manifestFiles);
+ IcebergDataset icebergDataset = new IcebergDataset(test_db_name,
test_table_name, icebergTable, new Properties(), fs);
+ MockDestinationFileSystem mockDestinationFileSystem = new
MockDestinationFileSystem();
+ mockDestinationFileSystem.addPath(METADATA_PATH);
+ mockDestinationFileSystem.addPath(MANIFEST_PATH);
+ mockDestinationFileSystem.addPath(MANIFEST_LIST_PATH);
+ mockDestinationFileSystem.addPath(MANIFEST_FILE_PATH1);
+ mockDestinationFileSystem.addPath(MANIFEST_FILE_PATH2);
+
+ mockFileSystemMethodCalls(fs, mockDestinationFileSystem.pathToFileStatus,
test_qualified_path, test_uri_path);
+
+ Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(copyConfiguration);
+ verifyCopyEntities(copyEntities, expected);
+
+ }
+
+ private void verifyCopyEntities(Collection<CopyEntity> copyEntities,
List<String> expected) {
+ List<String> actual = new ArrayList<>();
+ for (CopyEntity copyEntity : copyEntities) {
+ String json = copyEntity.toString();
+ JsonObject jsonObject = new Gson().fromJson(json, JsonObject.class);
+ JsonObject objectData =
+
jsonObject.getAsJsonObject("object-data").getAsJsonObject("origin").getAsJsonObject("object-data");
+ JsonObject pathObject =
objectData.getAsJsonObject("path").getAsJsonObject("object-data").getAsJsonObject("uri");
+ String filepath =
pathObject.getAsJsonPrimitive("object-data").getAsString();
+ actual.add(filepath);
+ }
+ Assert.assertEquals(actual.size(), expected.size());
+ Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
+ }
+
+ private void mockFileSystemMethodCalls(FileSystem fs, Map<Path, FileStatus>
pathToFileStatus, String qualifiedPath, String uriPath)
+ throws URISyntaxException, IOException {
+
+ Mockito.when(fs.getUri()).thenReturn(new URI(null, null, uriPath, null));
+ for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
+ Path path = entry.getKey();
+ FileStatus fileStatus = entry.getValue();
+ Mockito.when(fs.getFileStatus(path)).thenReturn(fileStatus);
+ Mockito.when(fs.makeQualified(path)).thenReturn(new Path(qualifiedPath));
+ }
+ }
+
+ private static class MockedIcebergTable extends IcebergTable {
+
+ String metadataPath;
+ String manifestListPath;
+ List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
+
+ public MockedIcebergTable(String metadataPath, String manifestListPath,
List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles) {
+ super(null);
+ this.metadataPath = metadataPath;
+ this.manifestListPath = manifestListPath;
+ this.manifestFiles = manifestFiles;
+ }
+
+ @Override
+ public IcebergSnapshotInfo getCurrentSnapshotInfo() {
+ Long snapshotId = 0L;
+ Instant timestamp = Instant.ofEpochMilli(0L);
+ return new IcebergSnapshotInfo(snapshotId, timestamp, metadataPath,
manifestListPath, manifestFiles);
+ }
+ }
+
+ private static class MockDestinationFileSystem {
+ Map<Path, FileStatus> pathToFileStatus;
Review Comment:
yeah makes sense.... I have updated it as part of the latest commit
Issue Time Tracking
-------------------
Worklog Id: (was: 809686)
Time Spent: 8h 20m (was: 8h 10m)
> Create work units for Hive Catalog based Iceberg Datasets to support Distcp
> for Iceberg
> ---------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1709
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1709
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: distcp-ng
> Reporter: Meeth Gala
> Assignee: Issac Buenrostro
> Priority: Major
> Time Spent: 8h 20m
> Remaining Estimate: 0h
>
> We want to support Distcp for Iceberg based datasets.
> As a pilot, we are starting with Hive Catalog and will expand the
> functionality to cover all Iceberg based datasets.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)