This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a056e0230e89d46ef4ee20659e7a51d2e60b7e3e Author: Piotr Nowojski <[email protected]> AuthorDate: Wed Mar 6 17:04:26 2024 +0100 [FLINK-35768][tests] Add an ITCase for s5cmd --- .../flink/fs/s3/common/MinioTestContainer.java | 14 + .../flink/fs/s3/common/S5CmdOnMinioITCase.java | 356 +++++++++++++++++++++ .../s3hadoop/S5CmdOnHadoopS3FileSystemITCase.java | 25 ++ .../s3presto/S5CmdOnPrestoS3FileSystemITCase.java | 25 ++ 4 files changed, 420 insertions(+) diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java index e8420efe7b1..d313a77880d 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java @@ -33,6 +33,9 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.utility.Base58; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; import java.time.Duration; import java.util.Locale; @@ -141,4 +144,15 @@ public class MinioTestContainer extends GenericContainer<MinioTestContainer> { public String getS3UriForDefaultBucket() { return "s3://" + getDefaultBucketName(); } + + public void writeCredentialsFile(File credentialsFile) throws IOException { + try (FileWriter writer = new FileWriter(credentialsFile)) { + writer.write( + String.format( + "[default]\n" + + "aws_access_key_id = %s\n" + + "aws_secret_access_key = %s\n", + accessKey, secretKey)); + } + } } diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S5CmdOnMinioITCase.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S5CmdOnMinioITCase.java new file mode 100644 index 00000000000..74362d59c04 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S5CmdOnMinioITCase.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.TestContainerExtension; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.test.junit5.InjectMiniCluster; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CompressionUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.security.DigestInputStream; +import java.security.MessageDigest; +import java.util.Collections; +import java.util.Iterator; + +import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; +import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD; +import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS; +import static org.apache.flink.util.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code HAJobRunOnMinioS3StoreITCase} covers a job run where the HA data is stored in Minio. The + * implementation verifies whether the {@code JobResult} was written into the FileSystem-backed + * {@code JobResultStore}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class S5CmdOnMinioITCase { + + private static final int CHECKPOINT_INTERVAL = 100; + + @RegisterExtension + @Order(1) + private static final AllCallbackWrapper<TestContainerExtension<MinioTestContainer>> + MINIO_EXTENSION = + new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new)); + + @RegisterExtension + @Order(2) + private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + () -> { + final Configuration configuration = createConfiguration(); + FileSystem.initialize(configuration, null); + return new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(4) + .setConfiguration(configuration) + .build(); + }); + + private static Configuration createConfiguration() { + final Configuration config = new Configuration(); + getMinioContainer().setS3ConfigOptions(config); + File credentialsFile = new File(temporaryDirectory, "credentials"); + + try { + // It looks like on the CI machines s5cmd by default is using some other default + // authentication mechanism, that takes precedence over passing secret and access keys + // via environment variables. For example maybe there exists a credentials file in the + // default location with secrets from the S3, not MinIO. To circumvent it, lets use our + // own credentials file with secrets for MinIO. + checkState(credentialsFile.createNewFile()); + getMinioContainer().writeCredentialsFile(credentialsFile); + config.set( + S5CMD_EXTRA_ARGS, + S5CMD_EXTRA_ARGS.defaultValue() + + " --credentials-file " + + credentialsFile.getAbsolutePath()); + config.set(AbstractS3FileSystemFactory.S5CMD_PATH, getS5CmdPath()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + + config.set(CHECKPOINTS_DIRECTORY, createS3URIWithSubPath("checkpoints")); + // Effectively disable using ByteStreamStateHandle to ensure s5cmd is being used + config.set(FS_SMALL_FILE_THRESHOLD, MemorySize.parse("1b")); + return config; + } + + @TempDir public static File temporaryDirectory; + + private static MinioTestContainer getMinioContainer() { + return MINIO_EXTENSION.getCustomExtension().getTestContainer(); + } + + @BeforeAll + public static void prepareS5Cmd() throws Exception { + Path s5CmdTgz = Paths.get(temporaryDirectory.getPath(), "s5cmd.tar.gz"); + MessageDigest md = MessageDigest.getInstance("MD5"); + + final URI uri; + String expectedMd5; + switch (OperatingSystem.getCurrentOperatingSystem()) { + case LINUX: + uri = + new URI( + "https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_Linux-64bit.tar.gz"); + expectedMd5 = "66549a8bef5183f6ee65bf793aefca0e"; + break; + case MAC_OS: + uri = + new URI( + "https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_macOS-64bit.tar.gz"); + expectedMd5 = "c90292139a9bb8e6643f8970d858c7b9"; + break; + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported operating system [%s] for this test.", + OperatingSystem.getCurrentOperatingSystem())); + } + + try (InputStream inputStream = uri.toURL().openStream()) { + DigestInputStream digestInputStream = new DigestInputStream(inputStream, md); + Files.copy(digestInputStream, s5CmdTgz, StandardCopyOption.REPLACE_EXISTING); + } + String actualMd5 = digestToHexMd5(md.digest()); + checkState( + expectedMd5.equals(actualMd5), + "Expected md5 [%s] and actual md5 [%s] differ", + expectedMd5, + actualMd5); + + CompressionUtils.extractTarFile(s5CmdTgz.toString(), temporaryDirectory.getPath()); + } + + private static String digestToHexMd5(byte[] digest) { + StringBuffer sb = new StringBuffer(); + for (byte b : digest) { + sb.append(String.format("%02x", b & 0xff)); + } + return sb.toString(); + } + + @AfterAll + public static void unsetFileSystem() { + FileSystem.initialize(new Configuration(), null); + } + + @Test + public void testS5CmdConfigurationIsUsed(@InjectMiniCluster MiniCluster flinkCluster) + throws Exception { + String moveFrom = getS5CmdPath(); + String moveTo = moveFrom + "-moved"; + new File(moveFrom).renameTo(new File(moveTo)); + + try { + testRecoveryWithS5Cmd(flinkCluster); + } catch (Exception e) { + ExceptionUtils.assertThrowable( + e, throwable -> throwable.getMessage().contains("Unable to find s5cmd")); + } finally { + new File(moveTo).renameTo(new File(moveFrom)); + } + } + + @Test + public void testRecoveryWithS5Cmd(@InjectMiniCluster MiniCluster flinkCluster) + throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(CHECKPOINT_INTERVAL); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L)); + env.setStateBackend(new EmbeddedRocksDBStateBackend()); + // Disable changelog, to make sure state is stored in the RocksDB, not in changelog, as + // currently only RocksDB is using s5cmd. + env.enableChangelogStateBackend(false); + + try (CloseableIterator<Record> results = + env.addSource(new FailingSource()) + .keyBy(x -> x.key) + .reduce( + (ReduceFunction<Record>) + (record1, record2) -> { + checkState(record1.key == record2.key); + return new Record( + record1.key, record1.value + record2.value); + }) + .collectAsync()) { + + env.execute(); + + // verify that the max emitted values are exactly the sum of all emitted records + long maxValue1 = 0; + long maxValue2 = 0; + while (results.hasNext()) { + Record next = results.next(); + if (next.key == FailingSource.FIRST_KEY) { + maxValue1 = Math.max(maxValue1, next.value); + } else if (next.key == FailingSource.SECOND_KEY) { + maxValue2 = Math.max(maxValue2, next.value); + } else { + throw new Exception("This shouldn't happen: " + next); + } + } + assertThat(maxValue1) + .isEqualTo( + (FailingSource.LAST_EMITTED_VALUE + 1) + * FailingSource.LAST_EMITTED_VALUE + / 2); + assertThat(maxValue2) + .isEqualTo( + (FailingSource.LAST_EMITTED_VALUE + 1) + * FailingSource.LAST_EMITTED_VALUE); + } + } + + private static class Record { + + public Record() { + this(0, 0); + } + + public Record(int key, int value) { + this.key = key; + this.value = value; + } + + public int key; + public int value; + + @Override + public String toString() { + return String.format("%s(key=%d, value=%d)", Record.class.getSimpleName(), key, value); + } + } + + /** + * Bounded source that emits incremental records at most once per checkpoint. Generates a + * failure once halfway to the end. + */ + private class FailingSource extends RichSourceFunction<Record> implements CheckpointedFunction { + public static final int FIRST_KEY = 1; + public static final int SECOND_KEY = 2; + + public static final int LAST_EMITTED_VALUE = 10; + private static final int FAIL_AFTER_VALUE = LAST_EMITTED_VALUE / 2; + + private ListState<Integer> lastEmittedValueState; + private int lastEmittedValue; + private boolean isRestored = false; + private boolean emitted = false; + + private volatile boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + while (running && lastEmittedValue < LAST_EMITTED_VALUE) { + synchronized (ctx.getCheckpointLock()) { + if (!emitted) { + lastEmittedValue += 1; + ctx.collect(new Record(FIRST_KEY, lastEmittedValue)); + ctx.collect(new Record(SECOND_KEY, 2 * lastEmittedValue)); + emitted = true; + } + } + Thread.sleep(CHECKPOINT_INTERVAL / 20 + 1); + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (!isRestored && lastEmittedValue > FAIL_AFTER_VALUE) { + throw new ExpectedTestException("Time to failover!"); + } + lastEmittedValueState.update(Collections.singletonList(lastEmittedValue)); + emitted = false; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + lastEmittedValueState = + context.getOperatorStateStore() + .getListState( + new ListStateDescriptor<>("lastEmittedValue", Integer.class)); + + Iterator<Integer> lastEmittedValues = lastEmittedValueState.get().iterator(); + if (lastEmittedValues.hasNext()) { + lastEmittedValue = lastEmittedValues.next(); + isRestored = true; + } + checkState(!lastEmittedValues.hasNext()); + } + } + + private static String createS3URIWithSubPath(String... subfolders) { + return getMinioContainer().getS3UriForDefaultBucket() + createSubPath(subfolders); + } + + private static String createSubPath(String... subfolders) { + final String pathSeparator = "/"; + return pathSeparator + StringUtils.join(subfolders, pathSeparator); + } + + private static String getS5CmdPath() { + return Paths.get(temporaryDirectory.getPath(), "s5cmd").toAbsolutePath().toString(); + } +} diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/S5CmdOnHadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/S5CmdOnHadoopS3FileSystemITCase.java new file mode 100644 index 00000000000..bae4d6c0b30 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/S5CmdOnHadoopS3FileSystemITCase.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3hadoop; + +import org.apache.flink.fs.s3.common.HAJobRunOnMinioS3StoreITCase; +import org.apache.flink.fs.s3.common.S5CmdOnMinioITCase; + +/** Runs the {@link HAJobRunOnMinioS3StoreITCase} on the Hadoop S3 file system. */ +public class S5CmdOnHadoopS3FileSystemITCase extends S5CmdOnMinioITCase {} diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/S5CmdOnPrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/S5CmdOnPrestoS3FileSystemITCase.java new file mode 100644 index 00000000000..4ced9d34910 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/S5CmdOnPrestoS3FileSystemITCase.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3presto; + +import org.apache.flink.fs.s3.common.HAJobRunOnMinioS3StoreITCase; +import org.apache.flink.fs.s3.common.S5CmdOnMinioITCase; + +/** Runs the {@link HAJobRunOnMinioS3StoreITCase} on the Presto S3 file system. */ +public class S5CmdOnPrestoS3FileSystemITCase extends S5CmdOnMinioITCase {}
