[ 
https://issues.apache.org/jira/browse/BEAM-6443?focusedWorklogId=211339&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-211339
 ]

ASF GitHub Bot logged work on BEAM-6443:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Mar/19 22:02
            Start Date: 11/Mar/19 22:02
    Worklog Time Spent: 10m 
      Work Description: ihji commented on pull request #7547: [BEAM-6443] 
decrease the number of thread for BigQuery streaming inse…
URL: https://github.com/apache/beam/pull/7547#discussion_r264449198
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 ##########
 @@ -1001,4 +1007,141 @@ public void close() {
       client.close();
     }
   }
+
+  private static class BoundedExecutorService implements ExecutorService {
+    private final ExecutorService executor;
+    private final Semaphore semaphore;
+    private final int parallelism;
+
+    BoundedExecutorService(ExecutorService executor, int parallelism) {
+      this.executor = executor;
+      this.parallelism = parallelism;
+      this.semaphore = new Semaphore(parallelism);
+    }
+
+    @Override
+    public void shutdown() {
+      executor.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      List<Runnable> runnables = executor.shutdownNow();
+      // try to release permits as many as possible before returning 
semaphored runnables.
+      synchronized (this) {
+        if (semaphore.availablePermits() <= parallelism) {
+          semaphore.release(Integer.MAX_VALUE - parallelism);
 
 Review comment:
   I think we don't have to pair acquire() and release(). Excerpted from 
release() API doc:
   
   > There is no requirement that a thread that releases a permit must have 
acquired that permit by calling acquire().
   > 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Semaphore.html#release--
   
   The possible edge case would be that if we put the total number of permits 
more than Integer.MAX_VALUE by calling release() then it throws an exception. 
By checking availablePermits() before release() in synchronized section we can 
avoid those cases.
   
   Other option here is we can just return semaphored callables as is and 
document it clearly in a comment. I believe that this `BoundedExecutorService` 
class will hardly be reused anyway.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 211339)
    Time Spent: 4h 40m  (was: 4.5h)

> decrease the number of threads for BigQuery streaming insertAll
> ---------------------------------------------------------------
>
>                 Key: BEAM-6443
>                 URL: https://issues.apache.org/jira/browse/BEAM-6443
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: Major
>              Labels: triaged
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> When inserting (a large number of ) very small elements into BigQuery via 
> streaming insertAll, BigQueryIO causes lots of quota exceeded errors. This 
> implies that 1) BigQueryIO puts unnecessary overheads on BigQuery API layer 
> by sending requests too fast 2) log file becomes very big because of repeated 
> same error messages. Currently we use 50 shards for writing data into 
> BigQuery and in each bundle 20-30 futures are executed simultaneously with 
> unlimited thread pool. It would be worth investigating whether just single 
> thread pool is sufficient for running concurrent insertAll.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to