weisong44 commented on a change in pull request #1031: SAMZA-2191: Batch 
support for table APIs
URL: https://github.com/apache/samza/pull/1031#discussion_r284387573
 
 

 ##########
 File path: samza-core/src/main/java/org/apache/samza/table/batching/Batch.java
 ##########
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.table.batch.BatchPolicy;
+
+/**
+ * Manages a sequence of {@link Operation}s, which will be performed as a 
batch.
+ * A batch can be configured with a {@code batchMaxSize} and/or {@code 
batchMaxDelay}.
+ * When the number of operations in the batch exceeds the {@code batchMaxSize}
+ * or the time window exceeds the {@code batchMaxDelay}, the batch will be 
performed.
+ *
+ * @param <K> The type of the key associated with the {@link Operation}
+ * @param <V> The type of the value associated with the {@link Operation}
+ */
+abstract class Batch<K, V> {
+  protected final int batchMaxSize;
+  protected final int batchMaxDelay;
+  protected final BatchHandler<K, V> batchHandler;
+  protected final CompletableFuture<Void> completableFuture;
+  protected boolean closed = false;
+
+  /**
+   * @param batchHandler Defines how the batch will be performed.
+   * @param batchPolicy Defines the batch configurations.
+   */
+  Batch(BatchHandler<K, V> batchHandler, BatchPolicy batchPolicy) {
+    this.batchHandler = batchHandler;
+    // The max number of {@link Operation}s that the batch can hold.
+    this.batchMaxSize = batchPolicy.getBatchMaxSize();
+    // The max time that the batch can last before being performed.
+    this.batchMaxDelay = batchPolicy.getBatchMaxDelay();
+    completableFuture = new CompletableFuture<>();
+  }
+
+  /**
+   * Add an operation to the batch.
+   *
+   * @param operation The operation to be added.
+   */
+  abstract void addOperation(Operation<K, V> operation);
+
+  /**
+   * Close the bach so that it will not accept more operations.
+   */
+  void close() {
+    closed = true;
+  }
+
+  /**
+   * @return Whether the bach can accept more operations.
+   */
+  boolean isClosed() {
+    return closed;
+  }
+
+  abstract Collection<Operation<K, V>> getQueryOperations();
+  abstract Collection<Operation<K, V>> getUpdateOperations();
+
+  private List<Operation<K, V>> getPutOperations() {
+    return getUpdateOperations().stream().filter(op -> op instanceof 
PutOperation)
+        .collect(Collectors.toList());
+  }
+
+  private List<Operation<K, V>> getDeleteOperations() {
+    return getUpdateOperations().stream().filter(op -> op instanceof 
DeleteOperation)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Perform batch operations.
+   */
+  CompletableFuture<Void> process() {
+    return CompletableFuture.allOf(
+        batchHandler.handleBatchPut(getPutOperations()),
+        batchHandler.handleBatchDelete(getDeleteOperations()),
+        batchHandler.handleBatchGet(getQueryOperations()))
+        .whenComplete((val, throwable) -> {
+            if (throwable != null) {
+              completableFuture.completeExceptionally(throwable);
+              throw new SamzaException("Batch failed", throwable);
 
 Review comment:
   Shouldn't throw here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to