Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 7ad2fd951 -> 2d4628868


[CARBONDATA-2767][CarbonStore] Fix task locality issue

If the Spark cluster and the Hadoop cluster are two different machine cluster, 
the Spark tasks will run in RACK_LOCAL mode.

This closes #2528


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2d462886
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2d462886
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2d462886

Branch: refs/heads/carbonstore
Commit: 2d4628868fc9d07ad1f8c84c6e458f48b0a13998
Parents: 7ad2fd9
Author: QiangCai <[email protected]>
Authored: Thu Jul 19 14:50:38 2018 +0800
Committer: Jacky Li <[email protected]>
Committed: Wed Jul 25 20:50:42 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   9 ++
 .../carbondata/core/util/CarbonProperties.java  |   7 +
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  15 ++-
 pom.xml                                         |   3 +
 .../horizon/rest/controller/Horizon.java        |  24 +---
 .../rest/controller/HorizonController.java      |  27 +++-
 store/sql/pom.xml                               | 133 +++++++++++++++++++
 .../horizon/rest/controller/SqlHorizon.java     |  70 ++++++----
 .../carbondata/horizon/rest/util/Upload.java    |  94 +++++++++++++
 9 files changed, 337 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 75d6014..fb8e221 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1882,6 +1882,15 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
 
+  /**
+   * config carbon scan task locality
+   * true: it will execute tasks as close to the data, the locality is 
important,
+   * false: it will execute tasks immediately, not paying attention to the 
locality
+   */
+  public static final String CARBON_TASK_LOCALITY = "carbon.task.locality";
+
+  public static final String CARBON_TASK_LOCALITY_DEFAULT = "true";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index b60e6a3..49eb7fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1655,4 +1655,11 @@ public final class CarbonProperties {
       return CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT;
     }
   }
+
+  public static boolean isTaskLocality() {
+    String taskLocality = getInstance().getProperty(
+        CarbonCommonConstants.CARBON_TASK_LOCALITY,
+        CarbonCommonConstants.CARBON_TASK_LOCALITY_DEFAULT);
+    return taskLocality.equalsIgnoreCase("true");
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 149f711..1434b52 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -87,6 +87,8 @@ class CarbonScanRDD[T: ClassTag](
   }
   private var vectorReader = false
 
+  @transient private val isTaskLocality = CarbonProperties.isTaskLocality
+
   private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
   private val storageFormat = tableInfo.getFormat
 
@@ -739,9 +741,16 @@ class CarbonScanRDD[T: ClassTag](
    * Get the preferred locations where to launch this task.
    */
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonSparkPartition]
-    val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != 
"localhost")
-    firstOptionLocation
+    if (isTaskLocality) {
+      split.asInstanceOf[CarbonSparkPartition]
+        .split
+        .value
+        .getLocations
+        .filter(_ != "localhost")
+    } else {
+      // when the computation and the storage are separated, not require the 
preferred locations
+      Seq.empty[String]
+    }
   }
 
   def createVectorizedCarbonRecordReader(queryModel: QueryModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2135b9d..e10acc7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -586,6 +586,9 @@
     </profile>
     <profile>
       <id>horizon</id>
+      <properties>
+        <hadoop.version>2.8.3</hadoop.version>
+      </properties>
       <modules>
         <module>store/horizon</module>
         <module>store/sql</module>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
index 9d876ab..a30b587 100644
--- 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
@@ -17,9 +17,6 @@
 
 package org.apache.carbondata.horizon.rest.controller;
 
-import java.io.File;
-import java.io.IOException;
-
 import org.apache.carbondata.store.api.conf.StoreConf;
 
 import org.springframework.boot.SpringApplication;
@@ -33,31 +30,24 @@ public class Horizon {
 
   public static void main(String[] args) {
     String storeConfFile = System.getProperty(StoreConf.STORE_CONF_FILE);
-    if (storeConfFile == null) {
-      storeConfFile = getStoreConfFile();
-    }
     start(storeConfFile);
   }
 
-  static String getStoreConfFile() {
-    try {
-      return new File(".").getCanonicalPath() + "/store/conf/store.conf";
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   public static void start(String storeConfFile) {
     start(Horizon.class, storeConfFile);
   }
 
   static <T> void start(final Class<T> classTag, String storeConfFile) {
-    System.setProperty("carbonstore.conf.file", storeConfFile);
-    new Thread() {
+    if (storeConfFile != null) {
+      System.setProperty("carbonstore.conf.file", storeConfFile);
+    }
+    Thread thread = new Thread() {
       public void run() {
         context = SpringApplication.run(classTag);
       }
-    }.start();
+    };
+    thread.setDaemon(true);
+    thread.start();
   }
 
   public static void stop() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
index 4862483..a273f54 100644
--- 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
@@ -17,14 +17,18 @@
 package org.apache.carbondata.horizon.rest.controller;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.horizon.antlr.Parser;
 import org.apache.carbondata.horizon.rest.model.validate.RequestValidator;
 import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
@@ -59,7 +63,26 @@ public class HorizonController {
 
   public HorizonController() throws StoreException {
     String storeFile = System.getProperty("carbonstore.conf.file");
-    store = CarbonStoreFactory.getDistributedStore("GlobalStore", new 
StoreConf(storeFile));
+    StoreConf storeConf = new StoreConf();
+    try {
+      storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath())
+          .conf(StoreConf.MASTER_HOST, 
InetAddress.getLocalHost().getHostAddress())
+          .conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort())
+          .conf(StoreConf.WORKER_HOST, 
InetAddress.getLocalHost().getHostAddress())
+          .conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort())
+          .conf(StoreConf.WORKER_CORE_NUM, 2);
+
+      if (storeFile != null && FileFactory.isFileExist(storeFile)) {
+        storeConf.load(storeFile);
+      }
+
+    } catch (UnknownHostException e) {
+      throw new StoreException(e);
+    } catch (IOException e) {
+      throw new StoreException(e);
+    }
+
+    store = CarbonStoreFactory.getDistributedStore("GlobalStore", storeConf);
   }
 
   @RequestMapping(value = "echo")
@@ -112,7 +135,7 @@ public class HorizonController {
       i++;
     }
     long end = System.currentTimeMillis();
-    LOGGER.audit("[" + request.getRequestId() +  "] HorizonController select " 
+
+    LOGGER.audit("[" + request.getRequestId() + "] HorizonController select " +
         request.getDatabaseName() + "." + request.getTableName() +
         ", take time: " + (end - start) + " ms");
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/sql/pom.xml
----------------------------------------------------------------------
diff --git a/store/sql/pom.xml b/store/sql/pom.xml
index d90ebb3..683e1af 100644
--- a/store/sql/pom.xml
+++ b/store/sql/pom.xml
@@ -16,6 +16,7 @@
 
   <properties>
     <dev.path>${basedir}/../../dev</dev.path>
+    <spring.version>1.5.14.RELEASE</spring.version>
   </properties>
 
   <dependencies>
@@ -36,6 +37,33 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-s3</artifactId>
+      <version>1.10.6</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.5.2</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -81,4 +109,109 @@
       </plugin>
     </plugins>
   </build>
+
+  <profiles>
+    <profile>
+      <id>shade</id>
+      <properties>
+        <hadoop.deps.scope>provided</hadoop.deps.scope>
+        <spark.deps.scope>provided</spark.deps.scope>
+        <scala.deps.scope>provided</scala.deps.scope>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <dependencies>
+              <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>${spring.version}</version>
+              </dependency>
+            </dependencies>
+            <configuration>
+              
<keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              
<outputFile>${project.build.directory}/horizon-sql-shade.jar</outputFile>
+              <artifactSet>
+                <includes>
+                  <include>*:*</include>
+                  <!--use the following line if we only want carbondata 
related classes-->
+                  <!--<include>org.apache.carbondata:*</include>-->
+                </includes>
+                <excludes>
+                  <exclude>org.apache.hadoop:hadoop-annotations</exclude>
+                  <exclude>org.apache.hadoop:hadoop-auth</exclude>
+                  <exclude>org.apache.hadoop:hadoop-client</exclude>
+                  <exclude>org.apache.hadoop:hadoop-common</exclude>
+                  <exclude>org.apache.hadoop:hadoop-hdfs</exclude>
+                  <exclude>org.apache.hadoop:hadoop-hdfs-client</exclude>
+                  
<exclude>org.apache.hadoop:hadoop-mapreduce-client-app</exclude>
+                  
<exclude>org.apache.hadoop:hadoop-mapreduce-client-common</exclude>
+                  
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>
+                  
<exclude>org.apache.hadoop:hadoop-mapreduce-client-jobclient</exclude>
+                  
<exclude>org.apache.hadoop:hadoop-mapreduce-client-shuffle</exclude>
+                  <exclude>org.apache.hadoop:hadoop-yarn-api</exclude>
+                  <exclude>org.apache.hadoop:hadoop-yarn-client</exclude>
+                  <exclude>org.apache.hadoop:hadoop-yarn-common</exclude>
+                  
<exclude>org.apache.hadoop:hadoop-yarn-server-common</exclude>
+                  <exclude>org.apache.spark:*</exclude>
+                  <exclude>org.apache.zookeeper:*</exclude>
+                  <exclude>org.apache.avro:*</exclude>
+                  <exclude>com.google.guava:guava</exclude>
+                  <exclude>org.xerial.snappy:snappy-java</exclude>
+                  <!--add more items to be excluded from the assembly-->
+                </excludes>
+              </artifactSet>
+
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>org/datanucleus/**</exclude>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                    <exclude>META-INF/vfs-providers.xml</exclude>
+                    <exclude>io/netty/**</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+            <executions>
+              <execution>
+                <phase>package</phase>
+                <goals>
+                  <goal>shade</goal>
+                </goals>
+                <configuration>
+                  <transformers>
+                    <transformer
+                            
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                      <resource>META-INF/spring.handlers</resource>
+                    </transformer>
+                    <transformer
+                            
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
+                      <resource>META-INF/spring.factories</resource>
+                    </transformer>
+                    <transformer
+                            
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                      <resource>META-INF/spring.schemas</resource>
+                    </transformer>
+                    <transformer
+                            
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                    <transformer
+                            
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                      
<mainClass>sg.butterfly.emenu.api.config.EmenuApp</mainClass>
+                    </transformer>
+                  </transformers>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
----------------------------------------------------------------------
diff --git 
a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
 
b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
index 1812a50..9bf8e3a 100644
--- 
a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
+++ 
b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
@@ -17,45 +17,51 @@
 
 package org.apache.carbondata.horizon.rest.controller;
 
-import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.store.api.conf.StoreConf;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.CarbonSessionBuilder;
 import org.apache.spark.sql.SparkSession;
 
 public class SqlHorizon extends Horizon {
 
-  static SparkSession session;
+  private static LogService LOGGER =
+      LogServiceFactory.getLogService(SqlHorizon.class.getCanonicalName());
 
-  private static void createSession(String[] args) throws IOException {
-    String rootPath = new File(SqlHorizon.class.getResource("/").getPath()
-        + "../../../..").getCanonicalPath();
-    String storeLocation = rootPath + "/examples/spark2/target/store";
-    String warehouse = rootPath + "/examples/spark2/target/warehouse";
-    String metastoredb = rootPath + "/examples/spark2/target";
+  private static SparkSession session;
+  private static Configuration configuration;
+  private static String storeLocation;
 
+  private static void createSession(String[] args) throws IOException {
     CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
+        .addProperty(CarbonCommonConstants.CARBON_TASK_LOCALITY, "false")
         .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"yyyy/MM/dd HH:mm:ss")
         .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
         .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
         .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "");
 
-    int workThreadNum = 2;
-    String masterUrl = "local[" + workThreadNum + "]";
-
-    SparkSession.Builder baseBuilder = SparkSession
-        .builder()
-        .master(masterUrl)
+    SparkSession.Builder baseBuilder = SparkSession.builder()
         .appName("Horizon-SQL")
-        .config("spark.ui.port", 9000)
-        .config("spark.sql.warehouse.dir", warehouse)
-        .config("spark.driver.host", "localhost")
+        .config("spark.ui.port", 9876)
         .config("spark.sql.crossJoin.enabled", "true");
-    session = new CarbonSessionBuilder(baseBuilder).build(storeLocation, 
metastoredb, true);
+
+    Iterator<Map.Entry<String, String>> iterator = configuration.iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<String, String> entry = iterator.next();
+      baseBuilder.config(entry.getKey(), entry.getValue());
+    }
+
+    session = new CarbonSessionBuilder(baseBuilder).build(storeLocation, null, 
true);
   }
 
   static SparkSession getSession() {
@@ -63,20 +69,38 @@ public class SqlHorizon extends Horizon {
   }
 
   public static void main(String[] args) {
+    if (args.length < 5) {
+      LOGGER.error("Usage: SqlHorizon <store location> <fs.s3a.endpoint> 
<fs.s3a.access.key>"
+          + " <fs.s3a.secret.key> <fs.s3a.impl>");
+      return;
+    }
+
+    try {
+      storeLocation = args[0];
+      configuration = new Configuration();
+      configuration.set("fs.s3a.endpoint", args[1]);
+      configuration.set("fs.s3a.access.key", args[2]);
+      configuration.set("fs.s3a.secret.key", args[3]);
+      configuration.set("fs.s3a.impl", args[4]);
+
+      String ip = InetAddress.getLocalHost().getHostAddress();
+      LOGGER.audit("Driver IP: " + ip);
+    } catch (IOException e) {
+      LOGGER.error(e);
+      throw new RuntimeException(e);
+    }
+
     // Start Spring
     String storeConfFile = System.getProperty(StoreConf.STORE_CONF_FILE);
-    if (storeConfFile == null) {
-      storeConfFile = getStoreConfFile();
-    }
     start(SqlHorizon.class, storeConfFile);
 
     try {
-      // Start CarbonSession
-      Thread.sleep(3000);
       createSession(args);
       Thread.sleep(Long.MAX_VALUE);
     } catch (IOException | InterruptedException e) {
+      LOGGER.error(e);
       throw new RuntimeException(e);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java
----------------------------------------------------------------------
diff --git 
a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java 
b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java
new file mode 100644
index 0000000..2069a30
--- /dev/null
+++ 
b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.rest.util;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * a util to upload a local file to s3
+ */
+
+public class Upload {
+
+  private static Configuration configuration = null;
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2) {
+      System.err.println("Usage: Upload <local file> <s3 path> [overwrite]");
+      return;
+    }
+
+    String sourceFile = args[0];
+    String targetFile = args[1];
+    boolean isOverWrite = false;
+    if (args.length >= 3) {
+      isOverWrite = Boolean.valueOf(args[2]);
+    }
+
+    upload(sourceFile, targetFile, isOverWrite);
+  }
+
+  public static void upload(String sourceFile, String targetFile, boolean 
isOverWrite)
+      throws IOException {
+    Configuration hadoopConf = getConfiguration();
+
+    Path sourcePath = new Path(sourceFile);
+    FileSystem sourceFileSystem = sourcePath.getFileSystem(hadoopConf);
+    if (!sourceFileSystem.exists(sourcePath)) {
+      throw new IOException("source file not exists: " + sourceFile);
+    }
+
+    Path targetPath = new Path(targetFile);
+    FileSystem targetFileSystem = targetPath.getFileSystem(hadoopConf);
+    if (targetFileSystem.exists(targetPath) && !isOverWrite) {
+      throw new IOException("target file exists: " + targetFile);
+    }
+
+    IOUtils.copyBytes(sourceFileSystem.open(sourcePath),
+        targetFileSystem.create(targetPath, isOverWrite), 1024 * 4, true);
+
+    sourceFileSystem.close();
+    targetFileSystem.close();
+  }
+
+  public static synchronized Configuration getConfiguration() {
+    if (configuration == null) {
+      configuration = new Configuration();
+      Properties properties = System.getProperties();
+      for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+        Object key = entry.getKey();
+        Object value = entry.getValue();
+        if (key instanceof String && value instanceof String) {
+          String keyStr = (String) key;
+          if (keyStr.startsWith("hadoop.")) {
+            configuration.set(keyStr.substring("hadoop.".length()), (String) 
value);
+          }
+        }
+      }
+    }
+
+    return configuration;
+  }
+}

Reply via email to