pnowojski commented on a change in pull request #32:
URL: https://github.com/apache/flink-benchmarks/pull/32#discussion_r717314024



##########
File path: 
src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java
##########
@@ -97,43 +100,119 @@
         })
 @Warmup(iterations = 4)
 @Measurement(iterations = 10)
-public class BufferDebloatedCheckpointTimeBenchmark {
+public class CheckpointingTimeBenchmark {
     public static final int JOB_PARALLELISM = 4;
     public static final MemorySize START_MEMORY_SEGMENT_SIZE = 
MemorySize.parse("4 kb");
     public static final MemorySize MIN_MEMORY_SEGMENT_SIZE = 
MemorySize.parse("256 b");
     public static final Duration DEBLOATING_TARGET = Duration.of(300, 
ChronoUnit.MILLIS);
-    public static final int RECORD_SIZE = (int) 
MemorySize.parse("64b").getBytes();
+    public static final MemorySize DEBLOATING_RECORD_SIZE = 
MemorySize.parse("64b");
+    public static final MemorySize UNALIGNED_RECORD_SIZE = 
MemorySize.parse("1kb");
 
     public static void main(String[] args) throws RunnerException {
         Options options =
                 new OptionsBuilder()
                         .verbosity(VerboseMode.NORMAL)
-                        
.include(BufferDebloatedCheckpointTimeBenchmark.class.getCanonicalName())
+                        
.include(CheckpointingTimeBenchmark.class.getCanonicalName())
                         .build();
 
         new Runner(options).run();
     }
 
     @Benchmark
-    public void 
debloatedCheckpointSingleInput(DebloatedCheckpointEnvironmentContext context)
+    public void checkpointSingleInput(DebloatedCheckpointEnvironmentContext 
context)
             throws Exception {
         final CompletableFuture<String> checkpoint =
                 context.miniCluster.triggerCheckpoint(context.jobID);
         checkpoint.get();
     }
 
+    public enum CheckpointingMode {
+        UNALIGNED(
+                config -> {
+                    config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, 
true);
+                    config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
START_MEMORY_SEGMENT_SIZE);
+                    config.set(
+                            
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
+                            Duration.ofMillis(0));
+                    config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, 
false);
+                    return config;
+                },
+                () -> null,
+                UNALIGNED_RECORD_SIZE),
+        ALTERNATING(

Review comment:
       Ok, I understand your confusion. No, the checkpoints are unaligned, 
maybe with or without time out. `Alternating` relates to how its implemented 
internally, mostly in order to support aligned savepoints, but yes, also 
unaligned checpkoint barriers with timeout are implemented using aligned 
checkpoint barriers. But that's from the perspective of the given 
`CheckpointBarrierHandler`, which basically can expect aligned/unaligned 
checkpoint barriers arriving to it. From the perspective of a job/user, there 
is no alternation. There are just unaligned checkpoints with different timeout 
values. "Alternating" term is not used anywhere in the docs/public API/configs.
   
   In other words, it's an implementation detail of how `CheckpointBarrier` is 
implemented. It could have been implemented differently, for example by 
squashing the unaligned and aligned `CheckpointBarrierHandler` code into a 
single `UnaligndAndAlignedCheckpointBarrierHandler` that could support both 
aligned and unaligned checkpoints. However that would duplicate the code of the 
aligned version of the `CheckpointBarrierHandler`, we it's implemented as a 
`CheckpointBarrierHandler` that can alternate between aligned/unaligned 
barriers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to