jihoonson commented on a change in pull request #11770:
URL: https://github.com/apache/druid/pull/11770#discussion_r762366132



##########
File path: 
server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
##########
@@ -27,59 +27,295 @@
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.inject.Inject;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.druid.curator.inventory.CuratorInventoryManager;
+import org.apache.druid.curator.inventory.CuratorInventoryManagerStrategy;
+import org.apache.druid.curator.inventory.InventoryManagerConfig;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This class is deprecated. Use {@link HttpServerInventoryView} instead.
  */
 @Deprecated
 @ManageLifecycle
-public class BatchServerInventoryView extends 
AbstractCuratorServerInventoryView<Set<DataSegment>>
-    implements FilteredServerInventoryView
+public class BatchServerInventoryView implements ServerInventoryView, 
FilteredServerInventoryView
 {
   private static final EmittingLogger log = new 
EmittingLogger(BatchServerInventoryView.class);
 
+  private final CuratorInventoryManager<DruidServer, Set<DataSegment>> 
inventoryManager;
+  private final AtomicBoolean started = new AtomicBoolean(false);
+
+  private final ConcurrentMap<ServerRemovedCallback, Executor> 
serverRemovedCallbacks = new ConcurrentHashMap<>();
+  private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = 
new ConcurrentHashMap<>();
+
   private final ConcurrentMap<String, Set<DataSegment>> zNodes = new 
ConcurrentHashMap<>();
   private final ConcurrentMap<SegmentCallback, 
Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates =
       new ConcurrentHashMap<>();
   private final Predicate<Pair<DruidServerMetadata, DataSegment>> 
defaultFilter;
 
-  @Inject
   public BatchServerInventoryView(
       final ZkPathsConfig zkPaths,
       final CuratorFramework curator,
       final ObjectMapper jsonMapper,
-      final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter
+      final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter,
+      final String pathChildrenCacheExecPrefix
   )
   {
-    super(
-        log,
-        zkPaths.getAnnouncementsPath(),
-        zkPaths.getLiveSegmentsPath(),
+    this.inventoryManager = new CuratorInventoryManager<>(
         curator,
-        jsonMapper,
-        new TypeReference<Set<DataSegment>>()
+        new InventoryManagerConfig()
         {
+          @Override
+          public String getContainerPath()
+          {
+            return zkPaths.getAnnouncementsPath();
+          }
+
+          @Override
+          public String getInventoryPath()
+          {
+            return zkPaths.getLiveSegmentsPath();
+          }
+        },
+        Execs.singleThreaded(pathChildrenCacheExecPrefix + "-%s"),
+        new CuratorInventoryManagerStrategy<DruidServer, Set<DataSegment>>()
+        {
+          @Override
+          public DruidServer deserializeContainer(byte[] bytes)
+          {
+            try {
+              return jsonMapper.readValue(bytes, DruidServer.class);
+            }
+            catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+
+          @Override
+          public Set<DataSegment> deserializeInventory(byte[] bytes)
+          {
+            try {
+              return jsonMapper.readValue(bytes, new 
TypeReference<Set<DataSegment>>()
+              {
+              });
+            }
+            catch (IOException e) {
+              log.error(e, "Could not parse json: %s", 
StringUtils.fromUtf8(bytes));
+              throw new RuntimeException(e);
+            }
+          }
+
+          @Override
+          public void newContainer(DruidServer container)
+          {
+            log.info("New Server[%s]", container);
+          }
+
+          @Override
+          public void deadContainer(DruidServer deadContainer)
+          {
+            log.info("Server Disappeared[%s]", deadContainer);
+            runServerRemovedCallbacks(deadContainer);
+          }
+
+          @Override
+          public DruidServer updateContainer(DruidServer oldContainer, 
DruidServer newContainer)
+          {
+            return newContainer.addDataSegments(oldContainer);
+          }
+
+          @Override
+          public DruidServer addInventory(
+              final DruidServer container,
+              String inventoryKey,
+              final Set<DataSegment> inventory
+          )
+          {
+            return addInnerInventory(container, inventoryKey, inventory);
+          }
+
+          @Override
+          public DruidServer updateInventory(DruidServer container, String 
inventoryKey, Set<DataSegment> inventory)
+          {
+            return updateInnerInventory(container, inventoryKey, inventory);
+          }
+
+          @Override
+          public DruidServer removeInventory(final DruidServer container, 
String inventoryKey)
+          {
+            return removeInnerInventory(container, inventoryKey);
+          }
+
+          @Override
+          public void inventoryInitialized()
+          {
+            log.info("Inventory Initialized");
+            runSegmentCallbacks(SegmentCallback::segmentViewInitialized);
+          }
         }
     );
 
     this.defaultFilter = Preconditions.checkNotNull(defaultFilter);
   }
 
+  @LifecycleStart
+  public void start() throws Exception
+  {
+    synchronized (started) {
+      if (!started.get()) {
+        inventoryManager.start();
+        started.set(true);
+      }
+    }
+  }
+
+  @LifecycleStop
+  public void stop() throws IOException
+  {
+    synchronized (started) {
+      if (started.getAndSet(false)) {
+        inventoryManager.stop();
+      }
+    }
+  }
+
   @Override
+  public boolean isStarted()
+  {
+    return started.get();
+  }
+
+  @Override
+  public DruidServer getInventoryValue(String containerKey)

Review comment:
       i think you are right, but am not so sure why it's named that way. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to