[ 
https://issues.apache.org/jira/browse/BEAM-7948?focusedWorklogId=336858&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-336858
 ]

ASF GitHub Bot logged work on BEAM-7948:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Oct/19 16:21
            Start Date: 31/Oct/19 16:21
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on pull request #9949: [BEAM-7948] 
Add time-based cache threshold support in the Java data s…
URL: https://github.com/apache/beam/pull/9949#discussion_r341231985
 
 

 ##########
 File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
 ##########
 @@ -43,48 +57,83 @@
  */
 public class BeamFnDataBufferingOutboundObserver<T> implements 
CloseableFnDataReceiver<T> {
   // TODO: Consider moving this constant out of this class
+  /** @deprecated Use BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT instead. */
+  @Deprecated
   public static final String BEAM_FN_API_DATA_BUFFER_LIMIT = 
"beam_fn_api_data_buffer_limit=";
+
+  public static final String BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT =
+      "beam_fn_api_data_buffer_size_limit=";
+  public static final String BEAM_FN_API_DATA_BUFFER_TIME_LIMIT =
+      "beam_fn_api_data_buffer_time_limit=";
   @VisibleForTesting static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  private static final long DEFAULT_BUFFER_LIMIT_TIME_MS = -1L;
   private static final Logger LOG =
       LoggerFactory.getLogger(BeamFnDataBufferingOutboundObserver.class);
 
   public static <T> BeamFnDataBufferingOutboundObserver<T> forLocation(
+      PipelineOptions options,
       LogicalEndpoint endpoint,
       Coder<T> coder,
       StreamObserver<BeamFnApi.Elements> outboundObserver) {
-    return forLocationWithBufferLimit(
-        DEFAULT_BUFFER_LIMIT_BYTES, endpoint, coder, outboundObserver);
+    return new BeamFnDataBufferingOutboundObserver<>(
+        getSizeLimit(options), getTimeLimit(options), endpoint, coder, 
outboundObserver);
   }
 
-  public static <T> BeamFnDataBufferingOutboundObserver<T> 
forLocationWithBufferLimit(
-      int bufferLimit,
-      LogicalEndpoint endpoint,
-      Coder<T> coder,
-      StreamObserver<BeamFnApi.Elements> outboundObserver) {
-    return new BeamFnDataBufferingOutboundObserver<>(
-        bufferLimit, endpoint, coder, outboundObserver);
+  private static int getSizeLimit(PipelineOptions options) {
+    List<String> experiments = 
options.as(ExperimentalOptions.class).getExperiments();
+    for (String experiment : experiments == null ? 
Collections.<String>emptyList() : experiments) {
+      if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT)) {
+        return 
Integer.parseInt(experiment.substring(BEAM_FN_API_DATA_BUFFER_SIZE_LIMIT.length()));
+      }
+      if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_LIMIT)) {
+        return 
Integer.parseInt(experiment.substring(BEAM_FN_API_DATA_BUFFER_LIMIT.length()));
+      }
+    }
+    return DEFAULT_BUFFER_LIMIT_BYTES;
+  }
+
+  private static long getTimeLimit(PipelineOptions options) {
+    List<String> experiments = 
options.as(ExperimentalOptions.class).getExperiments();
+    for (String experiment : experiments == null ? 
Collections.<String>emptyList() : experiments) {
+      if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_TIME_LIMIT)) {
+        return 
Long.parseLong(experiment.substring(BEAM_FN_API_DATA_BUFFER_TIME_LIMIT.length()));
+      }
+    }
+    return DEFAULT_BUFFER_LIMIT_TIME_MS;
   }
 
   private long byteCounter;
   private long counter;
   private boolean closed;
-  private final int bufferLimit;
+  private final int sizeLimit;
   private final Coder<T> coder;
   private final LogicalEndpoint outputLocation;
   private final StreamObserver<BeamFnApi.Elements> outboundObserver;
   private final ByteString.Output bufferedElements;
+  @VisibleForTesting final ScheduledFuture<?> flushFuture;
+  private final AtomicReference<IOException> flushException = new 
AtomicReference<>(null);
+  private final Object lock = new Object();
 
   private BeamFnDataBufferingOutboundObserver(
 
 Review comment:
   I think we will want to have the original  implementation of this class so 
that we don't need to take on the synchronization overhead for flush.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 336858)
    Time Spent: 20m  (was: 10m)

> Add time-based cache threshold support in the Java data service
> ---------------------------------------------------------------
>
>                 Key: BEAM-7948
>                 URL: https://issues.apache.org/jira/browse/BEAM-7948
>             Project: Beam
>          Issue Type: Sub-task
>          Components: java-fn-execution
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in data service. It 
> should also support the time-based cache threshold. This is very important, 
> especially for streaming jobs which are sensitive to the delay.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to