[ 
https://issues.apache.org/jira/browse/BEAM-4286?focusedWorklogId=101970&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101970
 ]

ASF GitHub Bot logged work on BEAM-4286:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/May/18 01:23
            Start Date: 15/May/18 01:23
    Worklog Time Spent: 10m 
      Work Description: jkff commented on a change in pull request #5359: 
[BEAM-4286] Implement pooled artifact source
URL: https://github.com/apache/beam/pull/5359#discussion_r188142354
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
 ##########
 @@ -39,23 +44,134 @@
 @ThreadSafe
 public class ArtifactSourcePool implements ArtifactSource {
 
+  public static ArtifactSourcePool create() {
+    return new ArtifactSourcePool();
+  }
+
+  private final Object lock = new Object();
+  private final Map<ArtifactSource, ArtifactSourceLock> artifactSources = 
Maps.newLinkedHashMap();
+
   private ArtifactSourcePool() {}
 
   /**
    * Adds a new cache to the pool. When the returned {@link AutoCloseable} is 
closed, the given
-   * cache will be removed from the pool.
+   * cache will be removed from the pool. The call to {@link 
AutoCloseable#close()} will block until
+   * the artifact source is no longer being used.
    */
   public AutoCloseable addToPool(ArtifactSource artifactSource) {
-    throw new UnsupportedOperationException();
+    synchronized (lock) {
+      checkState(!artifactSources.containsKey(artifactSource));
+      artifactSources.put(artifactSource, new ArtifactSourceLock());
+      return () -> {
+        synchronized (lock) {
+          ArtifactSourceLock innerLock = 
artifactSources.remove(artifactSource);
+          checkState(innerLock != null);
+          innerLock.close();
+        }
+      };
+    }
   }
 
   @Override
   public Manifest getManifest() throws IOException {
-    throw new UnsupportedOperationException();
+    ArtifactSource source;
+    SourceHandle sourceHandle;
+    synchronized (lock) {
+      checkState(!artifactSources.isEmpty());
+      Map.Entry<ArtifactSource, ArtifactSourceLock> entry =
+          artifactSources.entrySet().iterator().next();
+      source = entry.getKey();
+      sourceHandle = entry.getValue().open();
+    }
+    try {
+      return source.getManifest();
+    } finally {
+      sourceHandle.close();
+    }
   }
 
   @Override
   public void getArtifact(String name, StreamObserver<ArtifactChunk> 
responseObserver) {
-    throw new UnsupportedOperationException();
+    ArtifactSource source;
+    SourceHandle sourceHandle;
+    synchronized (lock) {
+      checkState(!artifactSources.isEmpty());
 
 Review comment:
   Seems that you can deduplicate this w.r.t. the previous method into a method 
`SourceHandle getAny()`, if you make `SourceHandle` contain the 
`ArtifactSource` too.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101970)
    Time Spent: 50m  (was: 40m)

> Pooled artifact source
> ----------------------
>
>                 Key: BEAM-4286
>                 URL: https://issues.apache.org/jira/browse/BEAM-4286
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Minor
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Because DistributeCache lifetimes are tied to operator lifetimes in Flink, we 
> need a way to wrap operator-scoped artifact sources. Artifacts are inherently 
> job-scoped and should be the same throughout a job's lifetime. For this 
> reason, it is safe to pool artifact sources and serve artifacts from an 
> arbitrary pooled source as long as the underlying source is still in scope.
> We need a pooled source in order to satisfy the bundle factory interfaces. 
> Using the job-scoped and stage-scoped bundle factories allows us to cache and 
> reuse different components that serve SDK harnesses. Because the distributed 
> cache lifetimes are specific to Flink, the pooled artifact source should 
> probably live in a runner-specific directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to