This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4095de4d45f174b45dd1310a6a99c4fecfb2466a Author: Balaji Varadarajan <[email protected]> AuthorDate: Sun May 10 18:25:54 2020 -0700 [HUDI-820] cleaner repair command should only inspect clean metadata files (#1542) --- hudi-cli/pom.xml | 21 +++ .../apache/hudi/cli/HoodieTableHeaderFields.java | 36 ++++ .../apache/hudi/cli/commands/RepairsCommand.java | 21 ++- .../cli/commands/AbstractShellIntegrationTest.java | 61 ++++++ .../HoodieTestCommitMetadataGenerator.java | 137 ++++++++++++++ .../hudi/cli/commands/TestRepairsCommand.java | 206 +++++++++++++++++++++ .../hudi/common/HoodieTestDataGenerator.java | 17 +- .../apache/hudi/common/util/CollectionUtils.java | 111 +++++++++++ 8 files changed, 601 insertions(+), 9 deletions(-) diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml index a5e358c..7b63e23 100644 --- a/hudi-cli/pom.xml +++ b/hudi-cli/pom.xml @@ -150,6 +150,27 @@ <artifactId>hudi-utilities_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-client</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-utilities_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> <!-- Logging --> <dependency> diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java new file mode 100644 index 0000000..2e3bc01 --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli; + +/** + * Fields of print table header. + */ +public class HoodieTableHeaderFields { + + public static final String HEADER_PARTITION = "Partition"; + public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " Path"; + /** + * Fields of Repair. + */ + public static final String HEADER_METADATA_PRESENT = "Metadata Present?"; + public static final String HEADER_REPAIR_ACTION = "Action"; + public static final String HEADER_HOODIE_PROPERTY = "Property"; + public static final String HEADER_OLD_VALUE = "Old Value"; + public static final String HEADER_NEW_VALUE = "New Value"; +} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 7a65336..7feebed 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -25,10 +25,12 @@ import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.FSUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.avro.AvroRuntimeException; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; @@ -147,14 +149,21 @@ public class RepairsCommand implements CommandMarker { public void removeCorruptedPendingCleanAction() { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); - HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - - activeTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> { + HoodieTimeline cleanerTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline().getCleanerTimeline(); + LOG.info("Inspecting pending clean metadata in timeline for corrupted files"); + cleanerTimeline.filterInflightsAndRequested().getInstants().forEach(instant -> { try { CleanerUtils.getCleanerPlan(client, instant); - } catch (IOException e) { - LOG.warn("try to remove corrupted instant file: " + instant); + } catch (AvroRuntimeException e) { + LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant); FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant); + } catch (IOException ioe) { + if (ioe.getMessage().contains("Not an Avro data file")) { + LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant); + FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant); + } else { + throw new HoodieIOException(ioe.getMessage(), ioe); + } } }); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java new file mode 100644 index 0000000..ad81af5 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/AbstractShellIntegrationTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.common.HoodieClientTestHarness; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.springframework.shell.Bootstrap; +import org.springframework.shell.core.JLineShellComponent; + +/** + * Class to start Bootstrap and JLineShellComponent. + */ +public abstract class AbstractShellIntegrationTest extends HoodieClientTestHarness { + + private static JLineShellComponent shell; + + @BeforeClass + public static void startup() { + Bootstrap bootstrap = new Bootstrap(); + shell = bootstrap.getJLineShellComponent(); + } + + @AfterClass + public static void shutdown() { + shell.stop(); + } + + @Before + public void setup() throws Exception { + initResources(); + } + + @After + public void teardown() throws Exception { + cleanupResources(); + } + + protected static JLineShellComponent getShell() { + return shell; + } +} \ No newline at end of file diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/HoodieTestCommitMetadataGenerator.java new file mode 100644 index 0000000..7abad66 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/HoodieTestCommitMetadataGenerator.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTestUtils; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Class to be used in tests to keep generating test inserts and updates against a corpus. + */ +public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { + + // default commit metadata value + public static final String DEFAULT_PATH = "path"; + public static final String DEFAULT_FILEID = "fileId"; + public static final int DEFAULT_TOTAL_WRITE_BYTES = 50; + public static final String DEFAULT_PRE_COMMIT = "commit-1"; + public static final int DEFAULT_NUM_WRITES = 10; + public static final int DEFAULT_NUM_UPDATE_WRITES = 15; + public static final int DEFAULT_TOTAL_LOG_BLOCKS = 1; + public static final int DEFAULT_TOTAL_LOG_RECORDS = 10; + public static final int DEFAULT_OTHER_VALUE = 0; + public static final String DEFAULT_NULL_VALUE = "null"; + + /** + * Create a commit file with default CommitMetadata. + */ + public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration) { + createCommitFileWithMetadata(basePath, commitTime, configuration, Option.empty(), Option.empty()); + } + + public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration, + Option<Integer> writes, Option<Integer> updates) { + Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime), + HoodieTimeline.makeRequestedCommitFileName(commitTime)) + .forEach(f -> { + Path commitFile = new Path( + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); + FSDataOutputStream os = null; + try { + FileSystem fs = FSUtils.getFs(basePath, configuration); + os = fs.create(commitFile, true); + // Generate commitMetadata + HoodieCommitMetadata commitMetadata = generateCommitMetadata(basePath, commitTime, writes, updates); + // Write empty commit metadata + os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } finally { + if (null != os) { + try { + os.close(); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + } + }); + } + + /** + * Generate commitMetadata in path. + */ + public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime) throws IOException { + return generateCommitMetadata(basePath, commitTime, Option.empty(), Option.empty()); + } + + public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, + Option<Integer> writes, Option<Integer> updates) throws IOException { + String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime); + String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime); + return generateCommitMetadata(new HashMap<String, List<String>>() { + { + put(DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0)); + put(DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0)); + } + }, writes, updates); + } + + /** + * Method to generate commit metadata. + */ + private static HoodieCommitMetadata generateCommitMetadata(Map<String, List<String>> partitionToFilePaths, + Option<Integer> writes, Option<Integer> updates) { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + partitionToFilePaths.forEach((key, value) -> value.forEach(f -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(key); + writeStat.setPath(DEFAULT_PATH); + writeStat.setFileId(DEFAULT_FILEID); + writeStat.setTotalWriteBytes(DEFAULT_TOTAL_WRITE_BYTES); + writeStat.setPrevCommit(DEFAULT_PRE_COMMIT); + writeStat.setNumWrites(writes.orElse(DEFAULT_NUM_WRITES)); + writeStat.setNumUpdateWrites(updates.orElse(DEFAULT_NUM_UPDATE_WRITES)); + writeStat.setTotalLogBlocks(DEFAULT_TOTAL_LOG_BLOCKS); + writeStat.setTotalLogRecords(DEFAULT_TOTAL_LOG_RECORDS); + metadata.addWriteStat(key, writeStat); + })); + return metadata; + } +} \ No newline at end of file diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java new file mode 100644 index 0000000..1192915 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.TimelineLayoutVersion; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.FSUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Test; +import org.springframework.shell.core.CommandResult; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + + +/** + * Test class for {@link RepairsCommand}. + */ +public class TestRepairsCommand extends AbstractShellIntegrationTest { + + private String tablePath; + + @Before + public void init() throws IOException { + String tableName = "test_table"; + tablePath = basePath + File.separator + tableName; + + // Create table and connect + new TableCommand().createTable( + tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + } + + /** + * Test case for dry run 'repair addpartitionmeta'. + */ + @Test + public void testAddPartitionMetaWithDryRun() throws IOException { + // create commit instant + Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit")); + + // create partition path + String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; + String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; + assertTrue(fs.mkdirs(new Path(partition1))); + assertTrue(fs.mkdirs(new Path(partition2))); + assertTrue(fs.mkdirs(new Path(partition3))); + + // default is dry run. + CommandResult cr = getShell().executeCommand("repair addpartitionmeta"); + assertTrue(cr.isSuccess()); + + // expected all 'No'. + String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath) + .stream() + .map(partition -> new String[] {partition, "No", "None"}) + .toArray(String[][]::new); + String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + + assertEquals(expected, cr.getResult().toString()); + } + + /** + * Test case for real run 'repair addpartitionmeta'. + */ + @Test + public void testAddPartitionMetaWithRealRun() throws IOException { + // create commit instant + Files.createFile(Paths.get(tablePath + "/.hoodie/100.commit")); + + // create partition path + String partition1 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; + String partition2 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; + String partition3 = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; + assertTrue(fs.mkdirs(new Path(partition1))); + assertTrue(fs.mkdirs(new Path(partition2))); + assertTrue(fs.mkdirs(new Path(partition3))); + + CommandResult cr = getShell().executeCommand("repair addpartitionmeta --dryrun false"); + assertTrue(cr.isSuccess()); + + List<String> paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath); + // after dry run, the action will be 'Repaired' + String[][] rows = paths.stream() + .map(partition -> new String[] {partition, "No", "Repaired"}) + .toArray(String[][]::new); + String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + + assertEquals(expected, cr.getResult().toString()); + + cr = getShell().executeCommand("repair addpartitionmeta"); + + // after real run, Metadata is present now. + rows = paths.stream() + .map(partition -> new String[] {partition, "Yes", "None"}) + .toArray(String[][]::new); + expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, + HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows); + assertEquals(expected, cr.getResult().toString()); + } + + /** + * Test case for 'repair overwrite-hoodie-props'. + */ + @Test + public void testOverwriteHoodieProperties() throws IOException { + URL newProps = this.getClass().getClassLoader().getResource("table-config.properties"); + assertNotNull("New property file must exist", newProps); + + CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath()); + assertTrue(cr.isSuccess()); + + Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps(); + + // after overwrite, the stored value in .hoodie is equals to which read from properties. + Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps(); + Properties expectProps = new Properties(); + expectProps.load(new FileInputStream(new File(newProps.getPath()))); + + Map<String, String> expected = expectProps.entrySet().stream() + .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); + assertEquals(expected, result); + + // check result + List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", + "hoodie.archivelog.folder", "hoodie.timeline.layout.version"); + String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key, + oldProps.getOrDefault(key, null), result.getOrDefault(key, null)}) + .toArray(String[][]::new); + String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY, + HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows); + + assertEquals(expect, cr.getResult().toString()); + } + + /** + * Test case for 'repair corrupted clean files'. + */ + @Test + public void testRemoveCorruptedPendingCleanAction() throws IOException { + HoodieCLI.conf = jsc.hadoopConfiguration(); + + Configuration conf = HoodieCLI.conf; + + metaClient = HoodieCLI.getTableMetaClient(); + + // Create four requested files + for (int i = 100; i < 104; i++) { + String timestamp = String.valueOf(i); + // Write corrupted requested Compaction + HoodieTestCommitMetadataGenerator.createCompactionRequestedFile(tablePath, timestamp, conf); + } + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient); + // first, there are four instants + assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count()); + + CommandResult cr = getShell().executeCommand("repair corrupted clean files"); + assertTrue(cr.isSuccess()); + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient); + assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count()); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 99bea63..0ec3bc8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -216,12 +216,23 @@ public class HoodieTestDataGenerator { }); } - public static void createCompactionRequestedFile(String basePath, String commitTime, Configuration configuration) + public static void createEmptyCleanRequestedFile(String basePath, String instantTime, Configuration configuration) throws IOException { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" - + HoodieTimeline.makeRequestedCompactionFileName(commitTime)); + + HoodieTimeline.makeRequestedCleanerFileName(instantTime)); + createEmptyFile(basePath, commitFile, configuration); + } + + public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration) + throws IOException { + Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeRequestedCompactionFileName(instantTime)); + createEmptyFile(basePath, commitFile, configuration); + } + + private static void createEmptyFile(String basePath, Path filePath, Configuration configuration) throws IOException { FileSystem fs = FSUtils.getFs(basePath, configuration); - FSDataOutputStream os = fs.create(commitFile, true); + FSDataOutputStream os = fs.create(filePath, true); os.close(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java new file mode 100644 index 0000000..15e8bea --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.util.collection.Pair; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class CollectionUtils { + /** + * Determines whether two iterators contain equal elements in the same order. More specifically, + * this method returns {@code true} if {@code iterator1} and {@code iterator2} contain the same + * number of elements and every element of {@code iterator1} is equal to the corresponding element + * of {@code iterator2}. + * + * <p>Note that this will modify the supplied iterators, since they will have been advanced some + * number of elements forward. + */ + public static boolean elementsEqual(Iterator<?> iterator1, Iterator<?> iterator2) { + while (iterator1.hasNext()) { + if (!iterator2.hasNext()) { + return false; + } + Object o1 = iterator1.next(); + Object o2 = iterator2.next(); + if (!Objects.equals(o1, o2)) { + return false; + } + } + return !iterator2.hasNext(); + } + + @SafeVarargs + public static <T> Set<T> createSet(final T... elements) { + return Stream.of(elements).collect(Collectors.toSet()); + } + + public static <K,V> Map<K, V> createImmutableMap(final K key, final V value) { + return Collections.unmodifiableMap(Collections.singletonMap(key, value)); + } + + @SafeVarargs + public static <T> List<T> createImmutableList(final T... elements) { + return Collections.unmodifiableList(Stream.of(elements).collect(Collectors.toList())); + } + + public static <K,V> Map<K,V> createImmutableMap(final Map<K,V> map) { + return Collections.unmodifiableMap(map); + } + + @SafeVarargs + public static <K,V> Map<K,V> createImmutableMap(final Pair<K,V>... elements) { + Map<K,V> map = new HashMap<>(); + for (Pair<K,V> pair: elements) { + map.put(pair.getLeft(), pair.getRight()); + } + return Collections.unmodifiableMap(map); + } + + @SafeVarargs + public static <T> Set<T> createImmutableSet(final T... elements) { + return Collections.unmodifiableSet(createSet(elements)); + } + + public static <T> Set<T> createImmutableSet(final Set<T> set) { + return Collections.unmodifiableSet(set); + } + + public static <T> List<T> createImmutableList(final List<T> list) { + return Collections.unmodifiableList(list); + } + + private static Object[] checkElementsNotNull(Object... array) { + return checkElementsNotNull(array, array.length); + } + + private static Object[] checkElementsNotNull(Object[] array, int length) { + for (int i = 0; i < length; i++) { + checkElementNotNull(array[i], i); + } + return array; + } + + private static Object checkElementNotNull(Object element, int index) { + return Objects.requireNonNull(element, "Element is null at index " + index); + } +} \ No newline at end of file
