pnowojski commented on a change in pull request #32:
URL: https://github.com/apache/flink-benchmarks/pull/32#discussion_r716790459



##########
File path: 
src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java
##########
@@ -97,43 +100,119 @@
         })
 @Warmup(iterations = 4)
 @Measurement(iterations = 10)
-public class BufferDebloatedCheckpointTimeBenchmark {
+public class CheckpointingTimeBenchmark {
     public static final int JOB_PARALLELISM = 4;
     public static final MemorySize START_MEMORY_SEGMENT_SIZE = 
MemorySize.parse("4 kb");
     public static final MemorySize MIN_MEMORY_SEGMENT_SIZE = 
MemorySize.parse("256 b");
     public static final Duration DEBLOATING_TARGET = Duration.of(300, 
ChronoUnit.MILLIS);
-    public static final int RECORD_SIZE = (int) 
MemorySize.parse("64b").getBytes();
+    public static final MemorySize DEBLOATING_RECORD_SIZE = 
MemorySize.parse("64b");
+    public static final MemorySize UNALIGNED_RECORD_SIZE = 
MemorySize.parse("1kb");
 
     public static void main(String[] args) throws RunnerException {
         Options options =
                 new OptionsBuilder()
                         .verbosity(VerboseMode.NORMAL)
-                        
.include(BufferDebloatedCheckpointTimeBenchmark.class.getCanonicalName())
+                        
.include(CheckpointingTimeBenchmark.class.getCanonicalName())
                         .build();
 
         new Runner(options).run();
     }
 
     @Benchmark
-    public void 
debloatedCheckpointSingleInput(DebloatedCheckpointEnvironmentContext context)
+    public void checkpointSingleInput(DebloatedCheckpointEnvironmentContext 
context)
             throws Exception {
         final CompletableFuture<String> checkpoint =
                 context.miniCluster.triggerCheckpoint(context.jobID);
         checkpoint.get();
     }
 
+    public enum CheckpointingMode {
+        UNALIGNED(
+                config -> {
+                    config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, 
true);
+                    config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
START_MEMORY_SEGMENT_SIZE);
+                    config.set(
+                            
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
+                            Duration.ofMillis(0));
+                    config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, 
false);
+                    return config;
+                },
+                () -> null,
+                UNALIGNED_RECORD_SIZE),
+        ALTERNATING(
+                config -> {
+                    config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, 
true);
+                    config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
START_MEMORY_SEGMENT_SIZE);
+                    config.set(
+                            
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
+                            Duration.ofMillis(1));
+                    config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, 
false);
+                    return config;
+                },
+                () -> null,
+                UNALIGNED_RECORD_SIZE),
+        ALIGNED(
+                config -> {
+                    config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, 
false);
+                    config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
START_MEMORY_SEGMENT_SIZE);
+                    config.set(TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE, 
MIN_MEMORY_SEGMENT_SIZE);
+                    config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, 
true);
+                    config.set(TaskManagerOptions.BUFFER_DEBLOAT_TARGET, 
DEBLOATING_TARGET);
+                    config.set(
+                            TaskManagerOptions.BUFFER_DEBLOAT_PERIOD,
+                            Duration.of(10, ChronoUnit.MILLIS));
+                    config.set(TaskManagerOptions.BUFFER_DEBLOAT_SAMPLES, 5);
+                    return config;
+                },
+                () -> {
+                    // wait a bit for debloating to stabilize
+                    Thread.sleep(2_000);
+                    return null;
+                },
+                DEBLOATING_RECORD_SIZE);
+
+        private final Function<Configuration, Configuration> configFunc;
+        private final SupplierWithException<Void, Exception> postSetUpFunc;

Review comment:
       > I don't like the int setupSleepDelay because it is less flexible, plus 
I'd need to add an if (setupSleepDelay > 0).
   
   You could have the same argument in favour of replacing all of the regular 
function arguments with some lambda functions. Yes, it's less flexibly, but why 
use more complicated construct for something that could be expressed easily as 
a single parameter?
   
   > What's wrong with Supplier? It's just a function that does not take 
arguments.
   
   Readability. Putting aside strange `return null;` that does nothing, in 
order to understand what is the following line doing:
   ```
   mode.postSetUp();
   ```
   With supplier/function you need to jump through the code much more and this 
flexibility is actually hurting your understanding, as you need to see a 
definition of every possible `SupplierWithException<Void, Exception> 
postSetUpFunc` parameter to understand what is this single line doing in all of 
the cases. Currently, in this case, it's not that big of a deal, as everything 
is one file, relatively close to one another, but the point still stands. If 
you see
   ```
   mode.postSetUp() {
     postSetUpFunc.get();
   }
   ```
   it's very non obvious what is this doing. With
   ```
   Thread.sleep(mode.postSetUpSleep);
   ```
   it's much easier to understand.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to