ahmarsuhail commented on code in PR #7214:
URL: https://github.com/apache/hadoop/pull/7214#discussion_r1943106972


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -4354,22 +4257,25 @@ public void close() throws IOException {
   protected synchronized void stopAllServices() {
     try {
       trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), 
() -> {
-        closeAutocloseables(LOG, store);
+        closeAutocloseables(LOG, getStore());

Review Comment:
   @steveloughran can you explain the lifecycle of things with the Service 
stuff?
   
   We're doing `closeAutocloseables(LOG, getStore());` here, but Store doesn't 
actually implement AutoCloseable. So I'm not sure if we need to something here 
(call serviceStop()?)
   
   Also need to figure out what changes I need to make to close our 
s3SeekableInputStreamFactory, prior to this changes, I was passing in our AAL 
factory, in this `closeAutocloseables()`, which would call 
`.s3SeekableInputStreamFactory.close()`. 
   



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -176,25 +207,90 @@ public class S3AStoreImpl implements S3AStore {
       RateLimiting writeRateLimiter,
       AuditSpanSource<AuditSpanS3A> auditSpanSource,
       @Nullable FileSystem.Statistics fsStatistics) {
-    this.storeContextFactory = requireNonNull(storeContextFactory);
+    super("S3AStore");
+    this.auditSpanSource = requireNonNull(auditSpanSource);
     this.clientManager = requireNonNull(clientManager);
     this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+    this.fsStatistics = fsStatistics;
     this.instrumentation = requireNonNull(instrumentation);
     this.statisticsContext = requireNonNull(statisticsContext);
+    this.storeContextFactory = requireNonNull(storeContextFactory);
     this.storageStatistics = requireNonNull(storageStatistics);
     this.readRateLimiter = requireNonNull(readRateLimiter);
     this.writeRateLimiter = requireNonNull(writeRateLimiter);
-    this.auditSpanSource = requireNonNull(auditSpanSource);
     this.storeContext = 
requireNonNull(storeContextFactory.createStoreContext());
-    this.fsStatistics = fsStatistics;
+
     this.invoker = storeContext.getInvoker();
     this.bucket = storeContext.getBucket();
     this.requestFactory = storeContext.getRequestFactory();
+    addService(clientManager);
+  }
+
+  /**
+   * Create and initialize any subsidiary services, including the input stream 
factory.
+   * @param conf configuration
+   */
+  @Override
+  protected void serviceInit(final Configuration conf) throws Exception {
+
+    // create and register the stream factory, which will
+    // then follow the service lifecycle
+    objectInputStreamFactory = factoryFromConfig(conf);
+    addService(objectInputStreamFactory);
+
+    // init all child services, including the stream factory
+    super.serviceInit(conf);
+
+    // pass down extra information to the stream factory.
+    finishStreamFactoryInit();
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    initLocalDirAllocator();
+  }
+
+  /**
+   * Return the store path capabilities.
+   * If the object stream factory is non-null, hands off the
+   * query to that factory if not handled here.
+   * @param path path to query the capability of.
+   * @param capability non-null, non-empty string to query the path for 
support.
+   * @return known capabilities
+   */
+  @Override
+  public boolean hasPathCapability(final Path path, final String capability) {
+    switch (toLowerCase(capability)) {
+    case StreamCapabilities.IOSTATISTICS:

Review Comment:
   AAL doesn't have IoStats yet, so this should probably not return true 
without checking



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.IOException;
+
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.service.Service;
+
+/**
+ * A Factory for {@link ObjectInputStream} streams.
+ * <p>
+ * This class is instantiated during initialization of
+ * {@code S3AStore}, it then follows the same service
+ * lifecycle.
+ * <p>
+ * Note for maintainers: do try and keep this mostly stable.
+ * If new parameters need to be added, expand the
+ * {@link ObjectReadParameters} class, rather than change the
+ * interface signature.
+ */
+public interface ObjectInputStreamFactory
+    extends Service, StreamCapabilities {
+
+  /**
+   * Set extra initialization parameters.
+   * This MUST ONLY be invoked between {@code init()}
+   * and {@code start()}.
+   * @param factoryBindingParameters parameters for the factory binding
+   */
+  void bind(FactoryBindingParameters factoryBindingParameters);

Review Comment:
   discussed offline: this should throw IoException as createAsyncClient() 
throws IoException 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.Preconditions;
+
+import static org.apache.hadoop.util.StringUtils.toLowerCase;
+
+/**
+ * Base implementation of {@link ObjectInputStreamFactory}.
+ */
+public abstract class AbstractObjectInputStreamFactory extends AbstractService
+    implements ObjectInputStreamFactory {
+
+  /**
+   * Parameters passed down in
+   * {@link #bind(FactoryBindingParameters)}.
+   */
+  private FactoryBindingParameters bindingParameters;
+
+  /**
+   * Callbacks.
+   */
+  private StreamFactoryCallbacks callbacks;
+
+  protected AbstractObjectInputStreamFactory(final String name) {
+    super(name);
+  }
+
+  /**
+   * Bind to the callbacks.
+   * <p>
+   * The base class checks service state then stores
+   * the callback interface.
+   * @param factoryBindingParameters parameters for the factory binding
+   */
+  @Override
+  public void bind(final FactoryBindingParameters factoryBindingParameters) {
+    // must be on be invoked during service initialization
+    Preconditions.checkState(isInState(STATE.INITED),
+        "Input Stream factory %s is in wrong state: %s",
+        this, getServiceState());
+    bindingParameters = factoryBindingParameters;
+    callbacks = bindingParameters.callbacks();
+  }
+
+  /**
+   * Return base capabilities of all stream factories,
+   * defining what the base ObjectInputStream class does.
+   * This also includes the probe for stream type capability.
+   * @param capability string to query the stream support for.
+   * @return true if implemented
+   */
+  @Override
+  public boolean hasCapability(final String capability) {
+    switch (toLowerCase(capability)) {
+    case StreamCapabilities.IOSTATISTICS:
+    case StreamStatisticNames.STREAM_LEAKS:

Review Comment:
   similar to above, think we should just let the stream factories define what 
capabilities they have rather than doing it on the parent class. As AAL does 
not have IoStats or stream leaks right now 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to