Repository: carbondata Updated Branches: refs/heads/carbonstore 7ad2fd951 -> 2d4628868
[CARBONDATA-2767][CarbonStore] Fix task locality issue If the Spark cluster and the Hadoop cluster are two different machine cluster, the Spark tasks will run in RACK_LOCAL mode. This closes #2528 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2d462886 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2d462886 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2d462886 Branch: refs/heads/carbonstore Commit: 2d4628868fc9d07ad1f8c84c6e458f48b0a13998 Parents: 7ad2fd9 Author: QiangCai <[email protected]> Authored: Thu Jul 19 14:50:38 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Jul 25 20:50:42 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 9 ++ .../carbondata/core/util/CarbonProperties.java | 7 + .../carbondata/spark/rdd/CarbonScanRDD.scala | 15 ++- pom.xml | 3 + .../horizon/rest/controller/Horizon.java | 24 +--- .../rest/controller/HorizonController.java | 27 +++- store/sql/pom.xml | 133 +++++++++++++++++++ .../horizon/rest/controller/SqlHorizon.java | 70 ++++++---- .../carbondata/horizon/rest/util/Upload.java | 94 +++++++++++++ 9 files changed, 337 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 75d6014..fb8e221 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1882,6 +1882,15 @@ public final class CarbonCommonConstants { public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true"; + /** + * config carbon scan task locality + * true: it will execute tasks as close to the data, the locality is important, + * false: it will execute tasks immediately, not paying attention to the locality + */ + public static final String CARBON_TASK_LOCALITY = "carbon.task.locality"; + + public static final String CARBON_TASK_LOCALITY_DEFAULT = "true"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index b60e6a3..49eb7fb 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1655,4 +1655,11 @@ public final class CarbonProperties { return CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT; } } + + public static boolean isTaskLocality() { + String taskLocality = getInstance().getProperty( + CarbonCommonConstants.CARBON_TASK_LOCALITY, + CarbonCommonConstants.CARBON_TASK_LOCALITY_DEFAULT); + return taskLocality.equalsIgnoreCase("true"); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 149f711..1434b52 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -87,6 +87,8 @@ class CarbonScanRDD[T: ClassTag]( } private var vectorReader = false + @transient private val isTaskLocality = CarbonProperties.isTaskLocality + private val bucketedTable = tableInfo.getFactTable.getBucketingInfo private val storageFormat = tableInfo.getFormat @@ -739,9 +741,16 @@ class CarbonScanRDD[T: ClassTag]( * Get the preferred locations where to launch this task. */ override def getPreferredLocations(split: Partition): Seq[String] = { - val theSplit = split.asInstanceOf[CarbonSparkPartition] - val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost") - firstOptionLocation + if (isTaskLocality) { + split.asInstanceOf[CarbonSparkPartition] + .split + .value + .getLocations + .filter(_ != "localhost") + } else { + // when the computation and the storage are separated, not require the preferred locations + Seq.empty[String] + } } def createVectorizedCarbonRecordReader(queryModel: QueryModel, http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2135b9d..e10acc7 100644 --- a/pom.xml +++ b/pom.xml @@ -586,6 +586,9 @@ </profile> <profile> <id>horizon</id> + <properties> + <hadoop.version>2.8.3</hadoop.version> + </properties> <modules> <module>store/horizon</module> <module>store/sql</module> http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java index 9d876ab..a30b587 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java @@ -17,9 +17,6 @@ package org.apache.carbondata.horizon.rest.controller; -import java.io.File; -import java.io.IOException; - import org.apache.carbondata.store.api.conf.StoreConf; import org.springframework.boot.SpringApplication; @@ -33,31 +30,24 @@ public class Horizon { public static void main(String[] args) { String storeConfFile = System.getProperty(StoreConf.STORE_CONF_FILE); - if (storeConfFile == null) { - storeConfFile = getStoreConfFile(); - } start(storeConfFile); } - static String getStoreConfFile() { - try { - return new File(".").getCanonicalPath() + "/store/conf/store.conf"; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - public static void start(String storeConfFile) { start(Horizon.class, storeConfFile); } static <T> void start(final Class<T> classTag, String storeConfFile) { - System.setProperty("carbonstore.conf.file", storeConfFile); - new Thread() { + if (storeConfFile != null) { + System.setProperty("carbonstore.conf.file", storeConfFile); + } + Thread thread = new Thread() { public void run() { context = SpringApplication.run(classTag); } - }.start(); + }; + thread.setDaemon(true); + thread.start(); } public static void stop() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java index 4862483..a273f54 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java @@ -17,14 +17,18 @@ package org.apache.carbondata.horizon.rest.controller; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Iterator; import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.horizon.antlr.Parser; import org.apache.carbondata.horizon.rest.model.validate.RequestValidator; import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest; @@ -59,7 +63,26 @@ public class HorizonController { public HorizonController() throws StoreException { String storeFile = System.getProperty("carbonstore.conf.file"); - store = CarbonStoreFactory.getDistributedStore("GlobalStore", new StoreConf(storeFile)); + StoreConf storeConf = new StoreConf(); + try { + storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath()) + .conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost().getHostAddress()) + .conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort()) + .conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost().getHostAddress()) + .conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort()) + .conf(StoreConf.WORKER_CORE_NUM, 2); + + if (storeFile != null && FileFactory.isFileExist(storeFile)) { + storeConf.load(storeFile); + } + + } catch (UnknownHostException e) { + throw new StoreException(e); + } catch (IOException e) { + throw new StoreException(e); + } + + store = CarbonStoreFactory.getDistributedStore("GlobalStore", storeConf); } @RequestMapping(value = "echo") @@ -112,7 +135,7 @@ public class HorizonController { i++; } long end = System.currentTimeMillis(); - LOGGER.audit("[" + request.getRequestId() + "] HorizonController select " + + LOGGER.audit("[" + request.getRequestId() + "] HorizonController select " + request.getDatabaseName() + "." + request.getTableName() + ", take time: " + (end - start) + " ms"); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/sql/pom.xml ---------------------------------------------------------------------- diff --git a/store/sql/pom.xml b/store/sql/pom.xml index d90ebb3..683e1af 100644 --- a/store/sql/pom.xml +++ b/store/sql/pom.xml @@ -16,6 +16,7 @@ <properties> <dev.path>${basedir}/../../dev</dev.path> + <spring.version>1.5.14.RELEASE</spring.version> </properties> <dependencies> @@ -36,6 +37,33 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.10.6</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.2</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> @@ -81,4 +109,109 @@ </plugin> </plugins> </build> + + <profiles> + <profile> + <id>shade</id> + <properties> + <hadoop.deps.scope>provided</hadoop.deps.scope> + <spark.deps.scope>provided</spark.deps.scope> + <scala.deps.scope>provided</scala.deps.scope> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <dependencies> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-maven-plugin</artifactId> + <version>${spring.version}</version> + </dependency> + </dependencies> + <configuration> + <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope> + <createDependencyReducedPom>true</createDependencyReducedPom> + <outputFile>${project.build.directory}/horizon-sql-shade.jar</outputFile> + <artifactSet> + <includes> + <include>*:*</include> + <!--use the following line if we only want carbondata related classes--> + <!--<include>org.apache.carbondata:*</include>--> + </includes> + <excludes> + <exclude>org.apache.hadoop:hadoop-annotations</exclude> + <exclude>org.apache.hadoop:hadoop-auth</exclude> + <exclude>org.apache.hadoop:hadoop-client</exclude> + <exclude>org.apache.hadoop:hadoop-common</exclude> + <exclude>org.apache.hadoop:hadoop-hdfs</exclude> + <exclude>org.apache.hadoop:hadoop-hdfs-client</exclude> + <exclude>org.apache.hadoop:hadoop-mapreduce-client-app</exclude> + <exclude>org.apache.hadoop:hadoop-mapreduce-client-common</exclude> + <exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude> + <exclude>org.apache.hadoop:hadoop-mapreduce-client-jobclient</exclude> + <exclude>org.apache.hadoop:hadoop-mapreduce-client-shuffle</exclude> + <exclude>org.apache.hadoop:hadoop-yarn-api</exclude> + <exclude>org.apache.hadoop:hadoop-yarn-client</exclude> + <exclude>org.apache.hadoop:hadoop-yarn-common</exclude> + <exclude>org.apache.hadoop:hadoop-yarn-server-common</exclude> + <exclude>org.apache.spark:*</exclude> + <exclude>org.apache.zookeeper:*</exclude> + <exclude>org.apache.avro:*</exclude> + <exclude>com.google.guava:guava</exclude> + <exclude>org.xerial.snappy:snappy-java</exclude> + <!--add more items to be excluded from the assembly--> + </excludes> + </artifactSet> + + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>org/datanucleus/**</exclude> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + <exclude>META-INF/vfs-providers.xml</exclude> + <exclude>io/netty/**</exclude> + </excludes> + </filter> + </filters> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.handlers</resource> + </transformer> + <transformer + implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> + <resource>META-INF/spring.factories</resource> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.schemas</resource> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>sg.butterfly.emenu.api.config.EmenuApp</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java index 1812a50..9bf8e3a 100644 --- a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java @@ -17,45 +17,51 @@ package org.apache.carbondata.horizon.rest.controller; -import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.util.Iterator; +import java.util.Map; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.CarbonSessionBuilder; import org.apache.spark.sql.SparkSession; public class SqlHorizon extends Horizon { - static SparkSession session; + private static LogService LOGGER = + LogServiceFactory.getLogService(SqlHorizon.class.getCanonicalName()); - private static void createSession(String[] args) throws IOException { - String rootPath = new File(SqlHorizon.class.getResource("/").getPath() - + "../../../..").getCanonicalPath(); - String storeLocation = rootPath + "/examples/spark2/target/store"; - String warehouse = rootPath + "/examples/spark2/target/warehouse"; - String metastoredb = rootPath + "/examples/spark2/target"; + private static SparkSession session; + private static Configuration configuration; + private static String storeLocation; + private static void createSession(String[] args) throws IOException { CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation) + .addProperty(CarbonCommonConstants.CARBON_TASK_LOCALITY, "false") .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, ""); - int workThreadNum = 2; - String masterUrl = "local[" + workThreadNum + "]"; - - SparkSession.Builder baseBuilder = SparkSession - .builder() - .master(masterUrl) + SparkSession.Builder baseBuilder = SparkSession.builder() .appName("Horizon-SQL") - .config("spark.ui.port", 9000) - .config("spark.sql.warehouse.dir", warehouse) - .config("spark.driver.host", "localhost") + .config("spark.ui.port", 9876) .config("spark.sql.crossJoin.enabled", "true"); - session = new CarbonSessionBuilder(baseBuilder).build(storeLocation, metastoredb, true); + + Iterator<Map.Entry<String, String>> iterator = configuration.iterator(); + while (iterator.hasNext()) { + Map.Entry<String, String> entry = iterator.next(); + baseBuilder.config(entry.getKey(), entry.getValue()); + } + + session = new CarbonSessionBuilder(baseBuilder).build(storeLocation, null, true); } static SparkSession getSession() { @@ -63,20 +69,38 @@ public class SqlHorizon extends Horizon { } public static void main(String[] args) { + if (args.length < 5) { + LOGGER.error("Usage: SqlHorizon <store location> <fs.s3a.endpoint> <fs.s3a.access.key>" + + " <fs.s3a.secret.key> <fs.s3a.impl>"); + return; + } + + try { + storeLocation = args[0]; + configuration = new Configuration(); + configuration.set("fs.s3a.endpoint", args[1]); + configuration.set("fs.s3a.access.key", args[2]); + configuration.set("fs.s3a.secret.key", args[3]); + configuration.set("fs.s3a.impl", args[4]); + + String ip = InetAddress.getLocalHost().getHostAddress(); + LOGGER.audit("Driver IP: " + ip); + } catch (IOException e) { + LOGGER.error(e); + throw new RuntimeException(e); + } + // Start Spring String storeConfFile = System.getProperty(StoreConf.STORE_CONF_FILE); - if (storeConfFile == null) { - storeConfFile = getStoreConfFile(); - } start(SqlHorizon.class, storeConfFile); try { - // Start CarbonSession - Thread.sleep(3000); createSession(args); Thread.sleep(Long.MAX_VALUE); } catch (IOException | InterruptedException e) { + LOGGER.error(e); throw new RuntimeException(e); } } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d462886/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java new file mode 100644 index 0000000..2069a30 --- /dev/null +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.util; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +/** + * a util to upload a local file to s3 + */ + +public class Upload { + + private static Configuration configuration = null; + + public static void main(String[] args) throws IOException { + if (args.length < 2) { + System.err.println("Usage: Upload <local file> <s3 path> [overwrite]"); + return; + } + + String sourceFile = args[0]; + String targetFile = args[1]; + boolean isOverWrite = false; + if (args.length >= 3) { + isOverWrite = Boolean.valueOf(args[2]); + } + + upload(sourceFile, targetFile, isOverWrite); + } + + public static void upload(String sourceFile, String targetFile, boolean isOverWrite) + throws IOException { + Configuration hadoopConf = getConfiguration(); + + Path sourcePath = new Path(sourceFile); + FileSystem sourceFileSystem = sourcePath.getFileSystem(hadoopConf); + if (!sourceFileSystem.exists(sourcePath)) { + throw new IOException("source file not exists: " + sourceFile); + } + + Path targetPath = new Path(targetFile); + FileSystem targetFileSystem = targetPath.getFileSystem(hadoopConf); + if (targetFileSystem.exists(targetPath) && !isOverWrite) { + throw new IOException("target file exists: " + targetFile); + } + + IOUtils.copyBytes(sourceFileSystem.open(sourcePath), + targetFileSystem.create(targetPath, isOverWrite), 1024 * 4, true); + + sourceFileSystem.close(); + targetFileSystem.close(); + } + + public static synchronized Configuration getConfiguration() { + if (configuration == null) { + configuration = new Configuration(); + Properties properties = System.getProperties(); + for (Map.Entry<Object, Object> entry : properties.entrySet()) { + Object key = entry.getKey(); + Object value = entry.getValue(); + if (key instanceof String && value instanceof String) { + String keyStr = (String) key; + if (keyStr.startsWith("hadoop.")) { + configuration.set(keyStr.substring("hadoop.".length()), (String) value); + } + } + } + } + + return configuration; + } +}
