zentol commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r920140769


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method 
that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link 
Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may 
throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may 
throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new 
Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just 
change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new 
CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when 
joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are 
caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   There shouldn't be 2 ways to achieve the same thing; it just makes the class 
more complicated. As is you can create a CheckedThread without implementing 
`go()` nor providing a `Runnable` (or the exact opposite, doing both!), making 
it unclear how you are even supposed to implement it
   
   `Thread#run` possibly exists because at the time it was added passing a 
`Runnable` via the constructor just wasn't possible. `run()/go()` have always 
been problematic because they are methods visible to the user, despite the user 
not being supposed to ever call them.
   
   ReceiverThread is easy to migrate. Just encapsulate all the logic from the 
thread into a separate class, and have the `Runnable` refer to that. (See 
below).
   
   ```
   public class ReceiverThread extends CheckedThread {
       protected static final Logger LOG = 
LoggerFactory.getLogger(ReceiverThread.class);
   
       private final State state;
   
       protected ReceiverThread(State state) {
           super(
                   () -> {
                       try {
                           while (state.running) {
                               
state.readRecords(state.getExpectedRecord().get());
                               state.finishProcessingExpectedRecords();
                           }
                       } catch (InterruptedException e) {
                           if (state.running) {
                               throw e;
                           }
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
                   });
           setName(this.getClass().getName());
           this.state = state;
       }
   
       public synchronized CompletableFuture<?> setExpectedRecord(long record) {
           return state.setExpectedRecord(record);
       }
   
       public void shutdown() {
           state.running = false;
           interrupt();
           state.expectedRecord.complete(0L);
       }
   
       protected abstract static class State {
           protected final int expectedRepetitionsOfExpectedRecord;
   
           protected int expectedRecordCounter;
           protected CompletableFuture<Long> expectedRecord = new 
CompletableFuture<>();
           protected CompletableFuture<?> recordsProcessed = new 
CompletableFuture<>();
   
           protected volatile boolean running;
   
           protected State(int expectedRepetitionsOfExpectedRecord) {
               this.expectedRepetitionsOfExpectedRecord = 
expectedRepetitionsOfExpectedRecord;
           }
   
           public synchronized CompletableFuture<?> setExpectedRecord(long 
record) {
               checkState(!expectedRecord.isDone());
               checkState(!recordsProcessed.isDone());
               expectedRecord.complete(record);
               expectedRecordCounter = 0;
               return recordsProcessed;
           }
   
           private synchronized CompletableFuture<Long> getExpectedRecord() {
               return expectedRecord;
           }
   
           protected abstract void readRecords(long lastExpectedRecord) throws 
Exception;
   
           private synchronized void finishProcessingExpectedRecords() {
               checkState(expectedRecord.isDone());
               checkState(!recordsProcessed.isDone());
   
               recordsProcessed.complete(null);
               expectedRecord = new CompletableFuture<>();
               recordsProcessed = new CompletableFuture<>();
           }
       }
   }
   ```
   
   ```
   public class SerializingLongReceiver extends ReceiverThread {
   
       private SerializingLongReceiver(State state) {
           super(state);
       }
   
       public static SerializingLongReceiver create(
               InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
           return new SerializingLongReceiver(
                   new State(inputGate, expectedRepetitionsOfExpectedRecord));
       }
   
       private static class State extends ReceiverThread.State {
   
           private final MutableRecordReader<LongValue> reader;
   
           private State(InputGate inputGate, int 
expectedRepetitionsOfExpectedRecord) {
               super(expectedRepetitionsOfExpectedRecord);
               this.reader =
                       new MutableRecordReader<>(
                               inputGate,
                               new String[] 
{EnvironmentInformation.getTemporaryFileDirectory()});
           }
   
           @Override
           protected void readRecords(long lastExpectedRecord) throws Exception 
{
               LOG.debug("readRecords(lastExpectedRecord = {})", 
lastExpectedRecord);
               final LongValue value = new LongValue();
   
               while (running && reader.next(value)) {
                   final long ts = value.getValue();
                   if (ts == lastExpectedRecord) {
                       expectedRecordCounter++;
                       if (expectedRecordCounter == 
expectedRepetitionsOfExpectedRecord) {
                           break;
                       }
                   }
               }
           }
       }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to