[ 
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939674
 ]

ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Oct/24 10:43
            Start Date: 23/Oct/24 10:43
    Worklog Time Spent: 10m 
      Work Description: Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1812467027


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+
+import static org.mockito.ArgumentMatchers.any;
+
+
+/** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionDataset} */
+public class IcebergPartitionDatasetTest {
+  private IcebergTable srcIcebergTable;
+  private IcebergTable destIcebergTable;
+  private TableMetadata srcTableMetadata;
+  private TableMetadata destTableMetadata;
+  private FileSystem sourceFs;
+  private FileSystem targetFs;
+  private IcebergPartitionDataset icebergPartitionDataset;
+  private MockedStatic<IcebergPartitionFilterPredicateUtil> 
icebergPartitionFilterPredicateUtil;
+  private static final String SRC_TEST_DB = "srcTestDB";
+  private static final String SRC_TEST_TABLE = "srcTestTable";
+  private static final String SRC_WRITE_LOCATION = SRC_TEST_DB + "/" + 
SRC_TEST_TABLE + "/data";
+  private static final String DEST_TEST_DB = "destTestDB";
+  private static final String DEST_TEST_TABLE = "destTestTable";
+  private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" + 
DEST_TEST_TABLE + "/data";
+  private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME = 
"testPartition";
+  private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE = 
"testValue";
+  private final Properties copyConfigProperties = new Properties();
+  private final Properties properties = new Properties();
+  private List<String> srcFilePaths;
+
+  private static final URI SRC_FS_URI;
+  private static final URI DEST_FS_URI;
+
+  static {
+    try {
+      SRC_FS_URI = new URI("abc", "the.source.org", "/", null);
+      DEST_FS_URI = new URI("xyz", "the.dest.org", "/", null);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("should not occur!", e);
+    }
+  }
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    setupSrcFileSystem();
+    setupDestFileSystem();
+
+    TableIdentifier tableIdentifier = TableIdentifier.of(SRC_TEST_DB, 
SRC_TEST_TABLE);
+
+    srcIcebergTable = Mockito.mock(IcebergTable.class);
+    destIcebergTable = Mockito.mock(IcebergTable.class);
+
+    srcTableMetadata = Mockito.mock(TableMetadata.class);
+    destTableMetadata = Mockito.mock(TableMetadata.class);
+    
Mockito.when(destTableMetadata.spec()).thenReturn(Mockito.mock(PartitionSpec.class));
+
+    Mockito.when(srcIcebergTable.getTableId()).thenReturn(tableIdentifier);
+    Mockito.when(destIcebergTable.getTableId()).thenReturn(tableIdentifier);
+    
Mockito.when(srcIcebergTable.accessTableMetadata()).thenReturn(srcTableMetadata);
+    
Mockito.when(destIcebergTable.accessTableMetadata()).thenReturn(destTableMetadata);
+    
Mockito.when(srcIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class));
+    
Mockito.when(destIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class));
+
+    icebergPartitionFilterPredicateUtil = 
Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class);
+    icebergPartitionFilterPredicateUtil
+        .when(() -> 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(),
 Mockito.any(), Mockito.any()))
+        .thenReturn(Optional.of(0));
+
+    copyConfigProperties.setProperty("data.publisher.final.dir", "/test");
+    srcFilePaths = new ArrayList<>();
+  }
+
+  @AfterMethod
+  public void cleanUp() {
+    srcFilePaths.clear();
+    icebergPartitionFilterPredicateUtil.close();
+  }
+
+  private void setupSrcFileSystem() throws IOException {
+    sourceFs = Mockito.mock(FileSystem.class);
+    Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+    Mockito.when(sourceFs.makeQualified(any(Path.class)))
+        .thenAnswer(invocation -> invocation.getArgument(0, 
Path.class).makeQualified(SRC_FS_URI, new Path("/")));
+    
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> {
+      Path path = invocation.getArgument(0, Path.class);
+      Path qualifiedPath = sourceFs.makeQualified(path);
+      return getFileStatus(qualifiedPath);
+    });
+  }
+
+  private void setupDestFileSystem() throws IOException {
+    targetFs = Mockito.mock(FileSystem.class);
+    Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI);
+    Mockito.when(targetFs.makeQualified(any(Path.class)))
+        .thenAnswer(invocation -> invocation.getArgument(0, 
Path.class).makeQualified(DEST_FS_URI, new Path("/")));
+    // Since we are adding UUID to the file name for every file while creating 
destination path,
+    // so return file not found exception if trying to find file status on 
destination file system
+    Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new 
FileNotFoundException());
+  }
+
+  @Test
+  public void testGenerateCopyEntities() throws IOException {
+    srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc");
+    List<DataFile> srcDataFiles = getDataFiles();
+    
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles);
+
+    icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, 
destIcebergTable, properties, sourceFs,
+        true);
+
+    CopyConfiguration copyConfiguration =
+        CopyConfiguration.builder(targetFs, 
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+            .copyContext(new CopyContext()).build();
+
+    Collection<CopyEntity> copyEntities = 
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
+
+    Assert.assertEquals(copyEntities.size(), 2);
+    verifyCopyEntities(copyEntities, true);

Review Comment:
   Done





Issue Time Tracking
-------------------

    Worklog Id:     (was: 939674)
    Time Spent: 11h 20m  (was: 11h 10m)

> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
>                 Key: GOBBLIN-2159
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vivek Rai
>            Priority: Major
>          Time Spent: 11h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to