pnowojski commented on a change in pull request #32:
URL: https://github.com/apache/flink-benchmarks/pull/32#discussion_r716792496



##########
File path: 
src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java
##########
@@ -97,43 +100,119 @@
         })
 @Warmup(iterations = 4)
 @Measurement(iterations = 10)
-public class BufferDebloatedCheckpointTimeBenchmark {
+public class CheckpointingTimeBenchmark {
     public static final int JOB_PARALLELISM = 4;
     public static final MemorySize START_MEMORY_SEGMENT_SIZE = 
MemorySize.parse("4 kb");
     public static final MemorySize MIN_MEMORY_SEGMENT_SIZE = 
MemorySize.parse("256 b");
     public static final Duration DEBLOATING_TARGET = Duration.of(300, 
ChronoUnit.MILLIS);
-    public static final int RECORD_SIZE = (int) 
MemorySize.parse("64b").getBytes();
+    public static final MemorySize DEBLOATING_RECORD_SIZE = 
MemorySize.parse("64b");
+    public static final MemorySize UNALIGNED_RECORD_SIZE = 
MemorySize.parse("1kb");
 
     public static void main(String[] args) throws RunnerException {
         Options options =
                 new OptionsBuilder()
                         .verbosity(VerboseMode.NORMAL)
-                        
.include(BufferDebloatedCheckpointTimeBenchmark.class.getCanonicalName())
+                        
.include(CheckpointingTimeBenchmark.class.getCanonicalName())
                         .build();
 
         new Runner(options).run();
     }
 
     @Benchmark
-    public void 
debloatedCheckpointSingleInput(DebloatedCheckpointEnvironmentContext context)
+    public void checkpointSingleInput(DebloatedCheckpointEnvironmentContext 
context)
             throws Exception {
         final CompletableFuture<String> checkpoint =
                 context.miniCluster.triggerCheckpoint(context.jobID);
         checkpoint.get();
     }
 
+    public enum CheckpointingMode {
+        UNALIGNED(
+                config -> {
+                    config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, 
true);
+                    config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
START_MEMORY_SEGMENT_SIZE);
+                    config.set(
+                            
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
+                            Duration.ofMillis(0));
+                    config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, 
false);
+                    return config;
+                },
+                () -> null,
+                UNALIGNED_RECORD_SIZE),
+        ALTERNATING(

Review comment:
       I don't see what is alternating here. `timeout=0` are unaligned 
checkpoints, that start as unaligned from the get go. `timeout=1` is a setup, 
where checkpoint barriers start aligned and they can timeout to unaligned after 
set period of time. I don't see anything "alternating" here. For a given job 
execution it's always one or the other.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to