This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch clean-up-filesystem
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3f6091c8132d6326f538916187f62c2ed01e05b9
Author: kishoreg <[email protected]>
AuthorDate: Tue Dec 17 17:42:18 2019 -0800

    Removing pinot-common dependency from pinot-hadoop-filesystem and 
pinot-azure-filesystem
---
 pinot-azure-filesystem/pom.xml                     |   2 +-
 .../org/apache/pinot/filesystem/AzurePinotFS.java  |  15 ++-
 .../apache/pinot/common/utils/CommonConstants.java |  12 --
 .../org/apache/pinot/filesystem/LocalPinotFS.java  |   2 +-
 .../apache/pinot/filesystem/PinotFSDelegator.java  | 147 +++++++++++++++++++++
 .../apache/pinot/filesystem/PinotFSFactory.java    |   5 +-
 pinot-hadoop-filesystem/pom.xml                    |   2 +-
 .../org/apache/pinot/filesystem/HadoopPinotFS.java |  60 +++------
 .../pinot/filesystem/HdfsSegmentFetcher.java       | 135 -------------------
 .../org/apache/pinot/spi/filesystem/PinotFS.java   |   2 +-
 10 files changed, 184 insertions(+), 198 deletions(-)

diff --git a/pinot-azure-filesystem/pom.xml b/pinot-azure-filesystem/pom.xml
index ef5af4c..0880436 100644
--- a/pinot-azure-filesystem/pom.xml
+++ b/pinot-azure-filesystem/pom.xml
@@ -50,7 +50,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-common</artifactId>
+      <artifactId>pinot-spi</artifactId>
     </dependency>
   </dependencies>
 </project>
diff --git 
a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
 
b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
index 2a2a51c..b57ae98 100644
--- 
a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
+++ 
b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
@@ -41,7 +41,6 @@ import java.util.List;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +55,10 @@ public class AzurePinotFS extends PinotFS {
   private static final int BUFFER_SIZE = 4096;
   private ADLStoreClient _adlStoreClient;
   private static final String[] EMPTY_ARR = new String[0];
+  private static final String ACCOUNT_ID = "accountId";
+  private static final String AUTH_ENDPOINT = "authEndpoint";
+  private static final String CLIENT_ID = "clientId";
+  private static final String CLIENT_SECRET = "clientSecret";
 
   public AzurePinotFS() {
 
@@ -69,14 +72,14 @@ public class AzurePinotFS extends PinotFS {
   @Override
   public void init(Configuration config) {
     // The ADL account id. Example: {@code mystore.azuredatalakestore.net}.
-    String account = 
config.getString(CommonConstants.SegmentOperations.AzureSegmentOperations.ACCOUNT_ID);
+    String account = config.getString(ACCOUNT_ID);
     // The endpoint that should be used for authentication.
     // Usually of the form {@code 
https://login.microsoftonline.com/<tenant-id>/oauth2/token}.
-    String authEndpoint = 
config.getString(CommonConstants.SegmentOperations.AzureSegmentOperations.AUTH_ENDPOINT);
+    String authEndpoint = config.getString(AUTH_ENDPOINT);
     // The clientId used to authenticate this application
-    String clientId = 
config.getString(CommonConstants.SegmentOperations.AzureSegmentOperations.CLIENT_ID);
+    String clientId = config.getString(CLIENT_ID);
     // The secret key used to authenticate this application
-    String clientSecret = 
config.getString(CommonConstants.SegmentOperations.AzureSegmentOperations.CLIENT_SECRET);
+    String clientSecret = config.getString(CLIENT_SECRET);
 
     AccessTokenProvider tokenProvider = new 
ClientCredsTokenProvider(authEndpoint, clientId, clientSecret);
     _adlStoreClient = ADLStoreClient.createClient(account, tokenProvider);
@@ -99,7 +102,7 @@ public class AzurePinotFS extends PinotFS {
   }
 
   @Override
-  protected boolean doMove(URI srcUri, URI dstUri)
+  public boolean doMove(URI srcUri, URI dstUri)
       throws IOException {
     return _adlStoreClient.rename(srcUri.getPath(), dstUri.getPath());
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 6b3b43f..476ae4f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -372,18 +372,6 @@ public class CommonConstants {
   }
 
   public static class SegmentOperations {
-    public static class HadoopSegmentOperations {
-      public static final String PRINCIPAL = "hadoop.kerberos.principle";
-      public static final String KEYTAB = "hadoop.kerberos.keytab";
-      public static final String HADOOP_CONF_PATH = "hadoop.conf.path";
-    }
-
-    public static class AzureSegmentOperations {
-      public static final String ACCOUNT_ID = "accountId";
-      public static final String AUTH_ENDPOINT = "authEndpoint";
-      public static final String CLIENT_ID = "clientId";
-      public static final String CLIENT_SECRET = "clientSecret";
-    }
 
     public static final String RETRY = "retry.count";
     public static final int RETRY_DEFAULT = 3;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java 
b/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
index f2fd595..fb52845 100644
--- a/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
+++ b/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
@@ -70,7 +70,7 @@ public class LocalPinotFS extends PinotFS {
   }
 
   @Override
-  protected boolean doMove(URI srcUri, URI dstUri)
+  public boolean doMove(URI srcUri, URI dstUri)
       throws IOException {
     File srcFile = toFile(srcUri);
     File dstFile = toFile(dstUri);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSDelegator.java 
b/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSDelegator.java
new file mode 100644
index 0000000..9b645f2
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSDelegator.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.filesystem;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
+import org.apache.pinot.common.utils.retry.RetryPolicy;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY;
+import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY_DEFAULT;
+import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY_WAITIME_MS;
+import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY_WAITIME_MS_DEFAULT;
+
+
+public class PinotFSDelegator extends PinotFS {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotFSDelegator.class);
+
+  private PinotFS _pinotFS;
+
+  private int _retryCount = RETRY_DEFAULT;
+  private int _retryWaitMs = RETRY_WAITIME_MS_DEFAULT;
+
+  PinotFSDelegator(PinotFS pinotFS) {
+
+    _pinotFS = pinotFS;
+  }
+
+  @Override
+  public void init(Configuration config) {
+    _retryCount = config.getInt(RETRY, _retryCount);
+    _retryWaitMs = config.getInt(RETRY_WAITIME_MS, _retryWaitMs);
+    _pinotFS.init(config);
+  }
+
+  @Override
+  public boolean mkdir(URI uri)
+      throws IOException {
+    return _pinotFS.mkdir(uri);
+  }
+
+  @Override
+  public boolean delete(URI segmentUri, boolean forceDelete)
+      throws IOException {
+    return _pinotFS.delete(segmentUri, forceDelete);
+  }
+
+  @Override
+  public boolean doMove(URI srcUri, URI dstUri)
+      throws IOException {
+    return _pinotFS.doMove(srcUri, dstUri);
+  }
+
+  @Override
+  public boolean copy(URI srcUri, URI dstUri)
+      throws IOException {
+    return _pinotFS.copy(srcUri, dstUri);
+  }
+
+  @Override
+  public boolean exists(URI fileUri)
+      throws IOException {
+    return _pinotFS.exists(fileUri);
+  }
+
+  @Override
+  public long length(URI fileUri)
+      throws IOException {
+    return _pinotFS.length(fileUri);
+  }
+
+  @Override
+  public String[] listFiles(URI fileUri, boolean recursive)
+      throws IOException {
+    return _pinotFS.listFiles(fileUri, recursive);
+  }
+
+  @Override
+  public void copyToLocalFile(URI srcUri, File dstFile)
+      throws Exception {
+    RetryPolicy fixedDelayRetryPolicy = 
RetryPolicies.fixedDelayRetryPolicy(_retryCount, _retryWaitMs);
+    fixedDelayRetryPolicy.attempt(() -> {
+      try {
+        long startMs = System.currentTimeMillis();
+        _pinotFS.copyToLocalFile(srcUri, dstFile);
+        LOGGER.debug("copied {} to {},  size {}, took {} ms", srcUri, dstFile,
+            dstFile.length(), System.currentTimeMillis() - startMs);
+
+        return true;
+      } catch (Exception e) {
+        return false;
+      }
+    });
+  }
+
+  @Override
+  public void copyFromLocalFile(File srcFile, URI dstUri)
+      throws Exception {
+    _pinotFS.copyFromLocalFile(srcFile, dstUri);
+  }
+
+  @Override
+  public boolean isDirectory(URI uri)
+      throws IOException {
+    return _pinotFS.isDirectory(uri);
+  }
+
+  @Override
+  public long lastModified(URI uri)
+      throws IOException {
+    return _pinotFS.lastModified(uri);
+  }
+
+  @Override
+  public boolean touch(URI uri)
+      throws IOException {
+    return _pinotFS.touch(uri);
+  }
+
+  @Override
+  public InputStream open(URI uri)
+      throws IOException {
+    return _pinotFS.open(uri);
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java 
b/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java
index bbf2f43..bbe1564 100644
--- a/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java
+++ b/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java
@@ -48,8 +48,9 @@ public class PinotFSFactory {
     try {
       LOGGER.info("Initializing PinotFS for scheme {}, classname {}", scheme, 
fsClassName);
       PinotFS pinotFS = PluginManager.get().createInstance(fsClassName);
-      pinotFS.init(configuration);
-      _fileSystemMap.put(scheme, pinotFS);
+      PinotFSDelegator delegator = new PinotFSDelegator(pinotFS);
+      delegator.init(configuration);
+      _fileSystemMap.put(scheme, delegator);
     } catch (Exception e) {
       LOGGER.error("Could not instantiate file system for class {} with scheme 
{}", fsClassName, scheme, e);
       throw new RuntimeException(e);
diff --git a/pinot-hadoop-filesystem/pom.xml b/pinot-hadoop-filesystem/pom.xml
index 36d8dea..5ca139a 100644
--- a/pinot-hadoop-filesystem/pom.xml
+++ b/pinot-hadoop-filesystem/pom.xml
@@ -73,7 +73,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-common</artifactId>
+      <artifactId>pinot-spi</artifactId>
     </dependency>
   </dependencies>
 </project>
diff --git 
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
 
b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
index 1339b6a..e69eba5 100644
--- 
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
+++ 
b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
@@ -33,29 +33,21 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.common.utils.retry.RetryPolicies;
-import org.apache.pinot.common.utils.retry.RetryPolicy;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.HadoopSegmentOperations.HADOOP_CONF_PATH;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.HadoopSegmentOperations.KEYTAB;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.HadoopSegmentOperations.PRINCIPAL;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY_DEFAULT;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY_WAITIME_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY_WAITIME_MS_DEFAULT;
-
-
 /**
  * Implementation of PinotFS for the Hadoop Filesystem
  */
 public class HadoopPinotFS extends PinotFS {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopPinotFS.class);
+
+  private static final String PRINCIPAL = "hadoop.kerberos.principle";
+  private static final String KEYTAB = "hadoop.kerberos.keytab";
+  private static final String HADOOP_CONF_PATH = "hadoop.conf.path";
+
   private org.apache.hadoop.fs.FileSystem _hadoopFS = null;
-  private int _retryCount = RETRY_DEFAULT;
-  private int _retryWaitMs = RETRY_WAITIME_MS_DEFAULT;
   private org.apache.hadoop.conf.Configuration _hadoopConf;
 
   public HadoopPinotFS() {
@@ -65,8 +57,6 @@ public class HadoopPinotFS extends PinotFS {
   @Override
   public void init(Configuration config) {
     try {
-      _retryCount = config.getInt(RETRY, _retryCount);
-      _retryWaitMs = config.getInt(RETRY_WAITIME_MS, _retryWaitMs);
       _hadoopConf = getConf(config.getString(HADOOP_CONF_PATH));
       authenticate(_hadoopConf, config);
       _hadoopFS = org.apache.hadoop.fs.FileSystem.get(_hadoopConf);
@@ -93,7 +83,7 @@ public class HadoopPinotFS extends PinotFS {
   }
 
   @Override
-  protected boolean doMove(URI srcUri, URI dstUri)
+  public boolean doMove(URI srcUri, URI dstUri)
       throws IOException {
     return _hadoopFS.rename(new Path(srcUri), new Path(dstUri));
   }
@@ -151,13 +141,14 @@ public class HadoopPinotFS extends PinotFS {
     return retArray;
   }
 
-  private List<FileStatus> listStatus(Path path, boolean recursive) throws 
IOException {
+  private List<FileStatus> listStatus(Path path, boolean recursive)
+      throws IOException {
     List<FileStatus> fileStatuses = new ArrayList<>();
     FileStatus[] files = _hadoopFS.listStatus(path);
     for (FileStatus file : files) {
       fileStatuses.add(file);
       if (file.isDirectory() && recursive) {
-        List<FileStatus> subFiles =listStatus(file.getPath(), true);
+        List<FileStatus> subFiles = listStatus(file.getPath(), true);
         fileStatuses.addAll(subFiles);
       }
     }
@@ -169,28 +160,19 @@ public class HadoopPinotFS extends PinotFS {
       throws Exception {
     LOGGER.debug("starting to fetch segment from hdfs");
     final String dstFilePath = dstFile.getAbsolutePath();
-    try {
-      final Path remoteFile = new Path(srcUri);
-      final Path localFile = new Path(dstFile.toURI());
 
-      RetryPolicy fixedDelayRetryPolicy = 
RetryPolicies.fixedDelayRetryPolicy(_retryCount, _retryWaitMs);
-      fixedDelayRetryPolicy.attempt(() -> {
-        try {
-          if (_hadoopFS == null) {
-            throw new RuntimeException("_hadoopFS client is not initialized 
when trying to copy files");
-          }
-          long startMs = System.currentTimeMillis();
-          _hadoopFS.copyToLocalFile(remoteFile, localFile);
-          LOGGER.debug("copied {} from hdfs to {} in local for size {}, take 
{} ms", srcUri, dstFilePath,
-              dstFile.length(), System.currentTimeMillis() - startMs);
-          return true;
-        } catch (IOException e) {
-          LOGGER.warn("failed to fetch segment {} from hdfs, might retry", 
srcUri, e);
-          return false;
-        }
-      });
-    } catch (Exception e) {
-      LOGGER.error("failed to fetch {} from hdfs to local {}", srcUri, 
dstFilePath, e);
+    final Path remoteFile = new Path(srcUri);
+    final Path localFile = new Path(dstFile.toURI());
+    try {
+      if (_hadoopFS == null) {
+        throw new RuntimeException("_hadoopFS client is not initialized when 
trying to copy files");
+      }
+      long startMs = System.currentTimeMillis();
+      _hadoopFS.copyToLocalFile(remoteFile, localFile);
+      LOGGER.debug("copied {} from hdfs to {} in local for size {}, take {} 
ms", srcUri, dstFilePath, dstFile.length(),
+          System.currentTimeMillis() - startMs);
+    } catch (IOException e) {
+      LOGGER.warn("failed to fetch segment {} from hdfs, might retry", srcUri, 
e);
       throw e;
     }
   }
diff --git 
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HdfsSegmentFetcher.java
 
b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HdfsSegmentFetcher.java
deleted file mode 100644
index e56d6dd..0000000
--- 
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HdfsSegmentFetcher.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.filesystem;
-
-import com.google.common.base.Strings;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.common.segment.fetcher.SegmentFetcher;
-import org.apache.pinot.common.utils.retry.RetryPolicies;
-import org.apache.pinot.common.utils.retry.RetryPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.HadoopSegmentOperations.HADOOP_CONF_PATH;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.HadoopSegmentOperations.KEYTAB;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.HadoopSegmentOperations.PRINCIPAL;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY_DEFAULT;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY_WAITIME_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.SegmentOperations.RETRY_WAITIME_MS_DEFAULT;
-
-
-// Use PinotFSSegmentFetcher instead
-@Deprecated
-public class HdfsSegmentFetcher implements SegmentFetcher {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(HdfsSegmentFetcher.class);
-  private FileSystem _hadoopFS = null;
-  private int _retryCount = RETRY_DEFAULT;
-  private int _retryWaitMs = RETRY_WAITIME_MS_DEFAULT;
-
-  @Override
-  public void init(org.apache.commons.configuration.Configuration configs) {
-    try {
-      _retryCount = configs.getInt(RETRY, _retryCount);
-      _retryWaitMs = configs.getInt(RETRY_WAITIME_MS, _retryWaitMs);
-      Configuration hadoopConf = getConf(configs.getString(HADOOP_CONF_PATH));
-      authenticate(hadoopConf, configs);
-      _hadoopFS = FileSystem.get(hadoopConf);
-      LOGGER.info("successfully initialized hdfs segment fetcher");
-    } catch (Exception e) {
-      LOGGER.error("failed to initialized the hdfs segment fetcher", e);
-    }
-  }
-
-  private Configuration getConf(String hadoopConfPath) {
-    Configuration hadoopConf = new Configuration();
-    if (Strings.isNullOrEmpty(hadoopConfPath)) {
-      LOGGER.warn("no hadoop conf path is provided, will rely on default 
config");
-    } else {
-      hadoopConf.addResource(new Path(hadoopConfPath, "core-site.xml"));
-      hadoopConf.addResource(new Path(hadoopConfPath, "hdfs-site.xml"));
-    }
-    return hadoopConf;
-  }
-
-  private void authenticate(Configuration hadoopConf, 
org.apache.commons.configuration.Configuration configs) {
-    String principal = configs.getString(PRINCIPAL);
-    String keytab = configs.getString(KEYTAB);
-    if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(keytab)) {
-      UserGroupInformation.setConfiguration(hadoopConf);
-      if (UserGroupInformation.isSecurityEnabled()) {
-        try {
-          if (!UserGroupInformation.getCurrentUser().hasKerberosCredentials() 
|| !UserGroupInformation.getCurrentUser()
-              .getUserName().equals(principal)) {
-            LOGGER.info("Trying to authenticate user [%s] with keytab [%s]..", 
principal, keytab);
-            UserGroupInformation.loginUserFromKeytab(principal, keytab);
-          }
-        } catch (IOException e) {
-          throw new RuntimeException(
-              String.format("Failed to authenticate user principal [%s] with 
keytab [%s]", principal, keytab), e);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void fetchSegmentToLocal(final String uri, final File tempFile)
-      throws Exception {
-    LOGGER.debug("starting to fetch segment from hdfs");
-    final String dstFilePath = tempFile.getAbsolutePath();
-    try {
-      final Path remoteFile = new Path(uri);
-      final Path localFile = new Path(tempFile.toURI());
-
-      RetryPolicy fixedDelayRetryPolicy = 
RetryPolicies.fixedDelayRetryPolicy(_retryCount, _retryWaitMs);
-      fixedDelayRetryPolicy.attempt(() -> {
-        try {
-          if (_hadoopFS == null) {
-            throw new RuntimeException("_hadoopFS client is not initialized 
when trying to copy files");
-          }
-          long startMs = System.currentTimeMillis();
-          _hadoopFS.copyToLocalFile(remoteFile, localFile);
-          LOGGER
-              .debug("copied {} from hdfs to {} in local for size {}, take {} 
ms", uri, dstFilePath, tempFile.length(),
-                  System.currentTimeMillis() - startMs);
-          return true;
-        } catch (IOException e) {
-          LOGGER.warn("failed to fetch segment {} from hdfs, might retry", 
uri, e);
-          return false;
-        }
-      });
-    } catch (Exception e) {
-      LOGGER.error("failed to fetch {} from hdfs to local {}", uri, 
dstFilePath, e);
-      throw e;
-    }
-  }
-
-  @Override
-  public Set<String> getProtectedConfigKeys() {
-    return Collections.emptySet();
-  }
-}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
index 0d2f9d5..1203498 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
@@ -111,7 +111,7 @@ public abstract class PinotFS implements Closeable {
   /**
    * Does the actual behavior of move in each FS.
    */
-  protected abstract boolean doMove(URI srcUri, URI dstUri)
+  public abstract boolean doMove(URI srcUri, URI dstUri)
       throws IOException;
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to