StephanEwen commented on a change in pull request #14199:
URL: https://github.com/apache/flink/pull/14199#discussion_r532215353
##########
File path:
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -61,20 +82,83 @@
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER = new
MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
- .setNumberSlotsPerTaskManager(PARALLELISM)
- .build());
+ private static TestingMiniCluster miniCluster;
- //
------------------------------------------------------------------------
+ private static TestingEmbeddedHaServices highAvailabilityServices;
+
+ private static CompletedCheckpointStore checkpointStore;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ highAvailabilityServices = new
HaServices(TestingUtils.defaultExecutor(),
+ () -> checkpointStore,
+ new StandaloneCheckpointIDCounter());
+
+ final Configuration configuration = createConfiguration();
+
+ miniCluster = new TestingMiniCluster(
+ new TestingMiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(PARALLELISM)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .build(),
+ () -> highAvailabilityServices);
+
+ miniCluster.start();
+ }
+
+ private static Configuration createConfiguration() throws IOException {
+ final Configuration configuration = new Configuration();
+ configuration.set(CheckpointingOptions.STATE_BACKEND,
"filesystem");
+ final String checkPointDir =
Path.fromLocalFile(TMP_FOLDER.newFolder()).toUri().toString();
+ configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkPointDir);
+ return configuration;
+ }
+
+ @Before
+ public void setup() {
+ checkpointStore = new RecoverableCompletedCheckpointStore();
+ }
+
+ @AfterClass
+ public static void teardownClass() throws Exception {
Review comment:
Similar as above, a more descriptive method name would be great.
##########
File path:
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -84,18 +168,33 @@ public void testBoundedTextFileSource() throws Exception {
writeHiddenJunkFiles(testDir);
final FileSource<String> source = FileSource
- .forRecordStreamFormat(new TextLineFormat(),
Path.fromLocalFile(testDir))
- .build();
+ .forRecordStreamFormat(new TextLineFormat(),
Path.fromLocalFile(testDir))
+ .build();
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final StreamExecutionEnvironment env = new
TestStreamEnvironment(miniCluster, PARALLELISM);
env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(10L);
Review comment:
I think the bounded case should have explicitly no checkpointing
enabled, so that we have recovery that re-executes the fully (goes back to the
beginning).
##########
File path:
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -308,4 +470,88 @@ private static void writeFileAtomically(
assertTrue(stagingFile.renameTo(file));
}
+
+ private enum FailoverType {
+ NONE,
+ TM,
+ JM
+ }
+
+ private static class HaServices extends TestingEmbeddedHaServices {
+ private final Supplier<CompletedCheckpointStore>
completedCheckpointStoreFactory;
+ private final CheckpointIDCounter checkpointIDCounter;
+
+ private HaServices(
+ Executor executor,
+ Supplier<CompletedCheckpointStore>
completedCheckpointStoreFactory,
+ CheckpointIDCounter checkpointIDCounter) {
+ super(executor);
+ this.completedCheckpointStoreFactory =
completedCheckpointStoreFactory;
+ this.checkpointIDCounter = checkpointIDCounter;
+ }
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory()
{
+ return new CheckpointRecoveryFactoryWithSettableStore(
+ completedCheckpointStoreFactory,
+ checkpointIDCounter);
+ }
+ }
+
+ private static class CheckpointRecoveryFactoryWithSettableStore
implements CheckpointRecoveryFactory {
+ private final Supplier<CompletedCheckpointStore>
completedCheckpointStoreFactory;
+ private final CheckpointIDCounter checkpointIDCounter;
+
+ private CheckpointRecoveryFactoryWithSettableStore(
+ Supplier<CompletedCheckpointStore>
completedCheckpointStoreFactory,
+ CheckpointIDCounter checkpointIDCounter) {
+ this.completedCheckpointStoreFactory =
completedCheckpointStoreFactory;
+ this.checkpointIDCounter = checkpointIDCounter;
+ }
+
+ @Override
+ public CompletedCheckpointStore createCheckpointStore(
+ JobID jobId,
+ int maxNumberOfCheckpointsToRetain,
+ ClassLoader userClassLoader) {
+ return completedCheckpointStoreFactory.get();
+ }
+
+ @Override
+ public CheckpointIDCounter createCheckpointIDCounter(JobID
jobId) {
+ return checkpointIDCounter;
+ }
+ }
+
+ private enum RecordCounterToFail {
Review comment:
Can we change this to a regular static class? I though the consensus was
to not use the enum pattern any more, because there is not really a problem
that it fixes and it causes confusion for many developers.
##########
File path:
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -61,20 +82,83 @@
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER = new
MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
- .setNumberSlotsPerTaskManager(PARALLELISM)
- .build());
+ private static TestingMiniCluster miniCluster;
- //
------------------------------------------------------------------------
+ private static TestingEmbeddedHaServices highAvailabilityServices;
+
+ private static CompletedCheckpointStore checkpointStore;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ highAvailabilityServices = new
HaServices(TestingUtils.defaultExecutor(),
+ () -> checkpointStore,
+ new StandaloneCheckpointIDCounter());
+
+ final Configuration configuration = createConfiguration();
+
+ miniCluster = new TestingMiniCluster(
+ new TestingMiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(PARALLELISM)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .build(),
+ () -> highAvailabilityServices);
+
+ miniCluster.start();
+ }
+
+ private static Configuration createConfiguration() throws IOException {
+ final Configuration configuration = new Configuration();
+ configuration.set(CheckpointingOptions.STATE_BACKEND,
"filesystem");
Review comment:
Does it make a difference here which state backend we use, "filesystem"
versus the default memory state backend?
Otherwise I would leave this out of here, following the principle that a
test should not deal with aspects unrelated to what it actually tests.
##########
File path:
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -61,20 +82,83 @@
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER = new
MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
- .setNumberSlotsPerTaskManager(PARALLELISM)
- .build());
+ private static TestingMiniCluster miniCluster;
- //
------------------------------------------------------------------------
+ private static TestingEmbeddedHaServices highAvailabilityServices;
+
+ private static CompletedCheckpointStore checkpointStore;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
Review comment:
Nit: It help reading if the method name is descriptive about what the
method does, like `setupMiniCluster()`. The fact that it runs at the beginning
of the class is expressed in the annotation already.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]