showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r922760493


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();

Review Comment:
   nice catch!



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable 
exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true

Review Comment:
   Also, since we've changed the logic here, could you also add some tests for 
it? You can refer to these tests: 
`ConsumerCoordinatorTest#testJoinPrepareAndCommitCompleted, 
ConsumerCoordinatorTest#testJoinPrepareWithDisableAutoCommit`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable 
exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: 
joinPrepare timeout");
+            } else if (!autoCommitOffsetRequestFuture.isDone() ||
+                autoCommitOffsetRequestFuture.failed() && 
autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = false;

Review Comment:
   Maybe add a debug log here:
   `log.debug("Asynchronous auto-commit of offsets failed with retriable error: 
{}. Will retry it.", autoCommitOffsetRequestFuture.exception().getMessage());
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }

Review Comment:
   should we update both timer here to reflect the client poll waiting. i.e. 
   ```java
           if (autoCommitOffsetRequestFuture != null) {
               Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
                      timer : joinPrepareTimer;
               client.poll(autoCommitOffsetRequestFuture, pollTimer);
              
               timer.update();
               joinPrepareTimer.update();
           }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -141,6 +141,9 @@ private boolean sameRequest(final Set<TopicPartition> 
currentRequest, final Gene
     }
 
     private final RebalanceProtocol protocol;
+    // pending commit offset request in onJoinPrepare
+    private RequestFuture<Void> autoCommitOffsetRequestFuture = null;
+    private Timer joinPrepareTimer = null;

Review Comment:
   Please also add a comment for `joinPrepareTimer`, ex:
   `// a timer for join prepare to know when to stop. It'll set to rebalance 
timeout so that the member can join the group successfully even though offset 
commit failed.`
   WDYT?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable 
exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true

Review Comment:
   It's not clear what it means to return true/false here. Maybe:
   ```
   // keep retrying the offset commit when:
   // 1. offset commit haven't done (and joinPrepareTime not expired)
   // 2. failed with retryable exception (and joinPrepareTime not expired)
   // Otherwise, continue to revoke partitions, ex:
   // 1. if joinPrepareTime has expired
   // 2. if offset commit failed with no-retryable exception
   // 3. if offset commit success
   ```
   WDYT?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable 
exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: 
joinPrepare timeout");
+            } else if (!autoCommitOffsetRequestFuture.isDone() ||
+                autoCommitOffsetRequestFuture.failed() && 
autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && 
!autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", 
autoCommitOffsetRequestFuture.exception().getMessage());
+            }
+            if (autoCommitOffsetRequestFuture.isDone()) {
+                autoCommitOffsetRequestFuture = null;
+            }
+        }
+        if (!onJoinPrepareAsyncCommitCompleted) {
+            timer.sleep(Math.min(timer.remainingMs(), 
rebalanceConfig.retryBackoffMs));
+            return false;

Review Comment:
   `joinPrepareTimer` should also update() after sleep, right?
   `joinPrepareTimer.update();`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -809,11 +835,12 @@ else if (future.failed() && !future.isRetriable()) {
 
         isLeader = false;
         subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;

Review Comment:
   We should also reset `autoCommitOffsetRequestFuture` to null before complete 
it, right? Otherwise, next time `onJoinPrepare` called, it will think there's 
an  in-flight request, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.

Review Comment:
   nit: // wait for commit offset response before timer expired



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable 
exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: 
joinPrepare timeout");

Review Comment:
   Maybe change to:
   `log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout. 
Will continue to join group");`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled

Review Comment:
   We should update this commit as:
   `// async commit offsets prior to rebalance if auto-commit enabled and there 
is no in-flight offset commit request`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +743,47 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
+        } else {
+            joinPrepareTimer.update();
+        }
         // async commit offsets prior to rebalance if auto-commit enabled
-        RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
-        // return true when
-        // 1. future is null, which means no commit request sent, so it is 
still considered completed
-        // 2. offset commit completed
-        // 3. offset commit failed with non-retriable exception
-        if (future == null)
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.succeeded())
-            onJoinPrepareAsyncCommitCompleted = true;
-        else if (future.failed() && !future.isRetriable()) {
-            log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-            onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+            autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
         }
 
+        // wait for commit offset response before timer.
+        if (autoCommitOffsetRequestFuture != null) {
+            Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+                   timer : joinPrepareTimer;
+            client.poll(autoCommitOffsetRequestFuture, pollTimer);
+        }
+
+        // 1. if joinPrepareTime has expired, return true
+        // 2. if offset commit haven't done or failed with retryable 
exception, return false
+        // 3. if offset commit failed with no-retryable exception, return true
+        // 4. if offset commit success, return true
+        boolean onJoinPrepareAsyncCommitCompleted = true;
+        if (autoCommitOffsetRequestFuture != null) {
+            if (joinPrepareTimer.isExpired()) {
+                log.error("Asynchronous auto-commit of offsets failed: 
joinPrepare timeout");
+            } else if (!autoCommitOffsetRequestFuture.isDone() ||
+                autoCommitOffsetRequestFuture.failed() && 
autoCommitOffsetRequestFuture.isRetriable()) {
+                onJoinPrepareAsyncCommitCompleted = false;
+            } else if (autoCommitOffsetRequestFuture.failed() && 
!autoCommitOffsetRequestFuture.isRetriable()) {
+                log.error("Asynchronous auto-commit of offsets failed: {}", 
autoCommitOffsetRequestFuture.exception().getMessage());

Review Comment:
   nit: `log.error("Asynchronous auto-commit of offsets failed: {}. Will 
continue to join group.", 
autoCommitOffsetRequestFuture.exception().getMessage());`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to