[ https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939324&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939324 ]
ASF GitHub Bot logged work on GOBBLIN-2159: ------------------------------------------- Author: ASF GitHub Bot Created on: 21/Oct/24 21:58 Start Date: 21/Oct/24 21:58 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4058: URL: https://github.com/apache/gobblin/pull/4058#discussion_r1809565015 ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyContext; +import org.apache.gobblin.data.management.copy.PreserveAttributes; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; +import org.apache.gobblin.dataset.DatasetDescriptor; + +import static org.mockito.ArgumentMatchers.any; + + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionDataset} */ +public class IcebergPartitionDatasetTest { + private IcebergTable srcIcebergTable; + private IcebergTable destIcebergTable; + private TableMetadata srcTableMetadata; + private TableMetadata destTableMetadata; + private FileSystem sourceFs; + private FileSystem targetFs; + private IcebergPartitionDataset icebergPartitionDataset; + private MockedStatic<IcebergPartitionFilterPredicateUtil> icebergPartitionFilterPredicateUtil; + private static final String SRC_TEST_DB = "srcTestDB"; + private static final String SRC_TEST_TABLE = "srcTestTable"; + private static final String SRC_WRITE_LOCATION = SRC_TEST_DB + "/" + SRC_TEST_TABLE + "/data"; + private static final String DEST_TEST_DB = "destTestDB"; + private static final String DEST_TEST_TABLE = "destTestTable"; + private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" + DEST_TEST_TABLE + "/data"; + private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME = "testPartition"; + private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE = "testValue"; + private final Properties copyConfigProperties = new Properties(); + private final Properties properties = new Properties(); + private List<String> srcFilePaths; + + private static final URI SRC_FS_URI; + private static final URI DEST_FS_URI; + + static { + try { + SRC_FS_URI = new URI("abc", "the.source.org", "/", null); + DEST_FS_URI = new URI("xyz", "the.dest.org", "/", null); + } catch (URISyntaxException e) { + throw new RuntimeException("should not occur!", e); + } + } + + @BeforeMethod + public void setUp() throws Exception { + setupSrcFileSystem(); + setupDestFileSystem(); + + TableIdentifier tableIdentifier = TableIdentifier.of(SRC_TEST_DB, SRC_TEST_TABLE); + + srcIcebergTable = Mockito.mock(IcebergTable.class); + destIcebergTable = Mockito.mock(IcebergTable.class); + + srcTableMetadata = Mockito.mock(TableMetadata.class); + destTableMetadata = Mockito.mock(TableMetadata.class); + Mockito.when(destTableMetadata.spec()).thenReturn(Mockito.mock(PartitionSpec.class)); + + Mockito.when(srcIcebergTable.getTableId()).thenReturn(tableIdentifier); + Mockito.when(destIcebergTable.getTableId()).thenReturn(tableIdentifier); + Mockito.when(srcIcebergTable.accessTableMetadata()).thenReturn(srcTableMetadata); + Mockito.when(destIcebergTable.accessTableMetadata()).thenReturn(destTableMetadata); + Mockito.when(srcIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class)); + Mockito.when(destIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class)); + + icebergPartitionFilterPredicateUtil = Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class); + icebergPartitionFilterPredicateUtil + .when(() -> IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(), Mockito.any(), Mockito.any())) + .thenReturn(Optional.of(0)); + + copyConfigProperties.setProperty("data.publisher.final.dir", "/test"); + srcFilePaths = new ArrayList<>(); + } + + @AfterMethod + public void cleanUp() { + srcFilePaths.clear(); + icebergPartitionFilterPredicateUtil.close(); + } + + private void setupSrcFileSystem() throws IOException { + sourceFs = Mockito.mock(FileSystem.class); + Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI); + Mockito.when(sourceFs.makeQualified(any(Path.class))) + .thenAnswer(invocation -> invocation.getArgument(0, Path.class).makeQualified(SRC_FS_URI, new Path("/"))); + Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> { + Path path = invocation.getArgument(0, Path.class); + Path qualifiedPath = sourceFs.makeQualified(path); + return getFileStatus(qualifiedPath); + }); + } + + private void setupDestFileSystem() throws IOException { + targetFs = Mockito.mock(FileSystem.class); + Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI); + Mockito.when(targetFs.makeQualified(any(Path.class))) + .thenAnswer(invocation -> invocation.getArgument(0, Path.class).makeQualified(DEST_FS_URI, new Path("/"))); + // Since we are adding UUID to the file name for every file while creating destination path, + // so return file not found exception if trying to find file status on destination file system + Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException()); + } + + @Test + public void testGenerateCopyEntities() throws IOException { + srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc"); + List<DataFile> srcDataFiles = getDataFiles(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + + icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(targetFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); + + Collection<CopyEntity> copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); + + Assert.assertEquals(copyEntities.size(), 2); + verifyCopyEntities(copyEntities, true); + } + + @Test + public void testGenerateCopyEntitiesWithEmptyDataFiles() throws IOException { + List<DataFile> srcDataFiles = Lists.newArrayList(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + + icebergPartitionDataset = new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true, TEST_ICEBERG_PARTITION_COLUMN_NAME, TEST_ICEBERG_PARTITION_COLUMN_VALUE); + Collection<CopyEntity> copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, + Mockito.mock(CopyConfiguration.class)); + + // Since No data files are present, no copy entities should be generated + Assert.assertEquals(copyEntities.size(), 0); + } + + @Test + public void testMultipleCopyEntitiesGenerated() throws IOException { + srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file2.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file3.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file4.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file5.orc"); + + List<DataFile> srcDataFiles = getDataFiles(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + + icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(targetFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); + + Collection<CopyEntity> copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); + + Assert.assertEquals(copyEntities.size(), 6); + verifyCopyEntities(copyEntities, true); + } + + @Test + public void testWithDifferentSrcAndDestTableWriteLocation() throws IOException { + srcFilePaths.add(SRC_WRITE_LOCATION + "/randomFile--Name.orc"); + Mockito.when(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "")).thenReturn(SRC_WRITE_LOCATION); + Mockito.when(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "")).thenReturn(DEST_WRITE_LOCATION); + + List<DataFile> srcDataFiles = getDataFiles(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + + icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(targetFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); + + List<CopyEntity> copyEntities = + (List<CopyEntity>) icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); + + Assert.assertEquals(copyEntities.size(), 2); + verifyCopyEntities(copyEntities, false); + } + + private List<DataFile> getDataFiles() throws IOException { + List<DataFile> dataFiles = new ArrayList<>(); + for (String srcFilePath : srcFilePaths) { + DataFile dataFile = Mockito.mock(DataFile.class); + Path dataFilePath = new Path(srcFilePath); + Path qualifiedPath = sourceFs.makeQualified(dataFilePath); + Mockito.when(dataFile.path()).thenReturn(dataFilePath.toString()); + Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(getFileStatus(qualifiedPath)); + dataFiles.add(dataFile); + } + return dataFiles; + } + + private static FileStatus getFileStatus(Path path) { + FileStatus fileStatus = new FileStatus(); + fileStatus.setPath(path); + return fileStatus; + } Review Comment: the name `get` is very confusing to associate w/ `setPath`. actually, here it looks like you `createEmptyFileStatus(Path path)`, which could also be made `public` in `IcebergDatasetTest`: ``` protected static FileStatus createEmptyFileStatus(String pathString) throws IOException { ... } ``` Issue Time Tracking ------------------- Worklog Id: (was: 939324) Time Spent: 7.5h (was: 7h 20m) > Support Partition Based Copy in Iceberg Distcp > ---------------------------------------------- > > Key: GOBBLIN-2159 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2159 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vivek Rai > Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)