This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d68ef7b  Fix HDFS copy logic (#5218)
d68ef7b is described below

commit d68ef7b1cb57a95e159a431b909cf45bef1e6a66
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Apr 16 20:02:35 2020 -0700

    Fix HDFS copy logic (#5218)
    
    * Make Quickstart to log error to consoles
    
    * Update HDFS copy logic
    
    * Adding test
---
 .../pinot/plugin/filesystem/HadoopPinotFS.java     | 38 +++++++++-----
 .../pinot/plugin/filesystem/HadoopPinotFSTest.java | 58 ++++++++++++++++++++++
 pinot-tools/src/main/resources/log4j2.xml          |  3 ++
 3 files changed, 87 insertions(+), 12 deletions(-)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
index f9bb03d..3fe5c0b 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
@@ -16,26 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pinot.plugin.filesystem;
 
 import com.google.common.base.Strings;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Implementation of PinotFS for the Hadoop Filesystem
@@ -46,6 +47,7 @@ public class HadoopPinotFS extends PinotFS {
   private static final String PRINCIPAL = "hadoop.kerberos.principle";
   private static final String KEYTAB = "hadoop.kerberos.keytab";
   private static final String HADOOP_CONF_PATH = "hadoop.conf.path";
+  private static final String WRITE_CHECKSUM = "hadoop.write.checksum";
 
   private org.apache.hadoop.fs.FileSystem _hadoopFS = null;
   private org.apache.hadoop.conf.Configuration _hadoopConf;
@@ -60,6 +62,7 @@ public class HadoopPinotFS extends PinotFS {
       _hadoopConf = getConf(config.getString(HADOOP_CONF_PATH));
       authenticate(_hadoopConf, config);
       _hadoopFS = org.apache.hadoop.fs.FileSystem.get(_hadoopConf);
+      _hadoopFS.setWriteChecksum((config.getBoolean(WRITE_CHECKSUM, false)));
       LOGGER.info("successfully initialized HadoopPinotFS");
     } catch (IOException e) {
       throw new RuntimeException("Could not initialize HadoopPinotFS", e);
@@ -97,13 +100,24 @@ public class HadoopPinotFS extends PinotFS {
       throws IOException {
     Path source = new Path(srcUri);
     Path target = new Path(dstUri);
-    RemoteIterator<LocatedFileStatus> sourceFiles = 
_hadoopFS.listFiles(source, true);
+    RemoteIterator<FileStatus> sourceFiles = 
_hadoopFS.listStatusIterator(source);
     if (sourceFiles != null) {
       while (sourceFiles.hasNext()) {
-        boolean succeeded =
-            FileUtil.copy(_hadoopFS, sourceFiles.next().getPath(), _hadoopFS, 
target, true, _hadoopConf);
-        if (!succeeded) {
-          return false;
+        FileStatus sourceFile = sourceFiles.next();
+        Path sourceFilePath = sourceFile.getPath();
+        if (sourceFile.isFile()) {
+          try {
+            FileUtil.copy(_hadoopFS, sourceFilePath, _hadoopFS, new 
Path(target, sourceFilePath.getName()), false,
+                _hadoopConf);
+          } catch (FileNotFoundException e) {
+            LOGGER.warn("Not found file {}, skipping copying it...", 
sourceFilePath, e);
+          }
+        } else if (sourceFile.isDirectory()) {
+          try {
+            copy(sourceFilePath.toUri(), new Path(target, 
sourceFilePath.getName()).toUri());
+          } catch (FileNotFoundException e) {
+            LOGGER.warn("Not found directory {}, skipping copying it...", 
sourceFilePath, e);
+          }
         }
       }
     }
diff --git 
a/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java
 
b/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java
new file mode 100644
index 0000000..d4e3ca0
--- /dev/null
+++ 
b/pinot-plugins/pinot-file-system/pinot-hdfs/src/test/java/org/apache/pinot/plugin/filesystem/HadoopPinotFSTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.plugin.filesystem;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class HadoopPinotFSTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopPinotFSTest.class);
+
+  private static final String TMP_DIR = System.getProperty("java.io.tmpdir");
+
+  @Test
+  public void testCopy()
+      throws IOException {
+    URI baseURI = URI.create(TMP_DIR + "/HadoopPinotFSTest");
+    HadoopPinotFS hadoopFS = new HadoopPinotFS();
+    hadoopFS.init(new BaseConfiguration());
+    hadoopFS.mkdir(new Path(baseURI.getPath(), "src").toUri());
+    hadoopFS.mkdir(new Path(baseURI.getPath(), "src/dir").toUri());
+    hadoopFS.touch(new Path(baseURI.getPath(), "src/dir/1").toUri());
+    hadoopFS.touch(new Path(baseURI.getPath(), "src/dir/2").toUri());
+    String[] srcFiles = hadoopFS.listFiles(new Path(baseURI.getPath(), 
"src").toUri(), true);
+    Assert.assertEquals(srcFiles.length, 3);
+    hadoopFS.copy(new Path(baseURI.getPath(), "src").toUri(), new 
Path(baseURI.getPath(), "dest").toUri());
+    Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), 
"dest").toUri()));
+    Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), 
"dest/dir").toUri()));
+    Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), 
"dest/dir/1").toUri()));
+    Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), 
"dest/dir/2").toUri()));
+    String[] destFiles = hadoopFS.listFiles(new Path(baseURI.getPath(), 
"dest").toUri(), true);
+    Assert.assertEquals(destFiles.length, 3);
+    hadoopFS.delete(baseURI, true);
+  }
+}
diff --git a/pinot-tools/src/main/resources/log4j2.xml 
b/pinot-tools/src/main/resources/log4j2.xml
index 5c59ecc..635cf37 100644
--- a/pinot-tools/src/main/resources/log4j2.xml
+++ b/pinot-tools/src/main/resources/log4j2.xml
@@ -48,6 +48,9 @@
       <AppenderRef ref="console"/>
     </Root>
     <Logger name="org.apache.pinot" level="info" additivity="false"/>
+    <Logger name="org.apache.pinot" level="error" additivity="false">
+      <AppenderRef ref="console"/>
+    </Logger>
     <Logger name="org.apache.pinot.tools" level="info" additivity="false">
       <AppenderRef ref="console"/>
     </Logger>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to