[
https://issues.apache.org/jira/browse/KYLIN-3446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585792#comment-16585792
]
ASF GitHub Bot commented on KYLIN-3446:
---------------------------------------
shaofengshi closed pull request #195: KYLIN-3446 Connect to HBase out of Spark
URL: https://github.com/apache/kylin/pull/195
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 68aa1728d5..3cc123922a 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -27,11 +27,15 @@
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
@@ -46,6 +50,7 @@
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.CuboidShardUtil;
+import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +68,7 @@
CubeDesc cubeDesc = null;
String segmentID = null;
String cuboidModeName = null;
+ String hbaseConfPath = null;
KylinConfig kylinConfig;
Path partitionFilePath;
@@ -74,6 +80,7 @@ public int run(String[] args) throws Exception {
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_PARTITION_FILE_PATH);
options.addOption(OPTION_CUBOID_MODE);
+ options.addOption(OPTION_DICT_PATH);
parseOptions(options, args);
partitionFilePath = new
Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@ -85,11 +92,12 @@ public int run(String[] args) throws Exception {
kylinConfig = cube.getConfig();
segmentID = getOptionValue(OPTION_SEGMENT_ID);
cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
+ hbaseConfPath = getOptionValue(OPTION_DICT_PATH);
CubeSegment cubeSegment = cube.getSegmentById(segmentID);
byte[][] splitKeys;
Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment,
kylinConfig).getCuboidSizeMap();
-
+
// for cube planner, will keep cuboidSizeMap unchanged if cube planner
is disabled
Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
@@ -104,14 +112,36 @@ public int run(String[] args) throws Exception {
}
cuboidSizeMap = optimizedCuboidSizeMap;
}
-
+
splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap,
kylinConfig, cubeSegment,
partitionFilePath.getParent());
CubeHTableUtil.createHTable(cubeSegment, splitKeys);
+ createHBaseConnection(cubeSegment);
return 0;
}
+ private void createHBaseConnection(CubeSegment cubeSegment) throws
Exception {
+
+ Configuration hbaseConf =
HBaseConnection.getCurrentHBaseConfiguration();
+ HadoopUtil.healSickConfig(hbaseConf);
+ Job job = new Job(hbaseConf,
cubeSegment.getStorageLocationIdentifier());
+ job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); //
HFile, replication=3
+ HTable table = new HTable(hbaseConf,
cubeSegment.getStorageLocationIdentifier());
+ HFileOutputFormat2.configureIncrementalLoadMap(job, table);
+
+ logger.info("Saving HBase configuration to " + hbaseConfPath);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(new Path(hbaseConfPath));
+ job.getConfiguration().writeXml(out);
+ } catch (IOException e) {
+ throw new ExecuteException("write hbase configuration failed");
+ } finally {
+ out.close();
+ }
+ }
//one region for one shard
private static byte[][] getSplitsByRegionCount(int regionCount) {
@@ -124,7 +154,9 @@ public int run(String[] args) throws Exception {
return result;
}
- public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long,
Double> cubeSizeMap, final KylinConfig kylinConfig, final CubeSegment
cubeSegment, final Path hfileSplitsOutputFolder) throws IOException {
+ public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long,
Double> cubeSizeMap,
+ final KylinConfig kylinConfig, final CubeSegment cubeSegment,
final Path hfileSplitsOutputFolder)
+ throws IOException {
final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
float cut = cubeDesc.getConfig().getKylinHBaseRegionCut();
@@ -157,7 +189,8 @@ public int run(String[] args) throws Exception {
}
if (nRegion != original) {
- logger.info("Region count is adjusted from " + original + " to
" + nRegion + " to help random sharding");
+ logger.info(
+ "Region count is adjusted from " + original + " to " +
nRegion + " to help random sharding");
}
}
@@ -188,10 +221,13 @@ public int run(String[] args) throws Exception {
}
if (shardNum > nRegion) {
- logger.info(String.format("Cuboid %d 's estimated size
%.2f MB will generate %d regions, reduce to %d", cuboidId, estimatedSize,
shardNum, nRegion));
+ logger.info(
+ String.format("Cuboid %d 's estimated size %.2f MB
will generate %d regions, reduce to %d",
+ cuboidId, estimatedSize, shardNum,
nRegion));
shardNum = nRegion;
} else {
- logger.info(String.format("Cuboid %d 's estimated size
%.2f MB will generate %d regions", cuboidId, estimatedSize, shardNum));
+ logger.info(String.format("Cuboid %d 's estimated size
%.2f MB will generate %d regions", cuboidId,
+ estimatedSize, shardNum));
}
cuboidShards.put(cuboidId, (short) shardNum);
@@ -204,7 +240,8 @@ public int run(String[] args) throws Exception {
}
for (int i = 0; i < nRegion; ++i) {
- logger.info(String.format("Region %d's estimated size is %.2f
MB, accounting for %.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] /
totalSizeInM));
+ logger.info(String.format("Region %d's estimated size is %.2f
MB, accounting for %.2f percent", i,
+ regionSizes[i], 100.0 * regionSizes[i] /
totalSizeInM));
}
CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards,
nRegion);
@@ -222,7 +259,8 @@ public int run(String[] args) throws Exception {
if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId))
>= mbPerRegion * 1.2) {
// if the size already bigger than threshold, or it will
exceed by 20%, cut for next region
regionSplit.add(cuboidId);
- logger.info("Region " + regionIndex + " will be " + size +
" MB, contains cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids");
+ logger.info("Region " + regionIndex + " will be " + size +
" MB, contains cuboids < " + cuboidId
+ + " (" + cuboidCount + ") cuboids");
size = 0;
cuboidCount = 0;
regionIndex++;
@@ -240,7 +278,8 @@ public int run(String[] args) throws Exception {
}
}
- protected static void saveHFileSplits(final List<HashMap<Long, Double>>
innerRegionSplits, int mbPerRegion, final Path outputFolder, final KylinConfig
kylinConfig) throws IOException {
+ protected static void saveHFileSplits(final List<HashMap<Long, Double>>
innerRegionSplits, int mbPerRegion,
+ final Path outputFolder, final KylinConfig kylinConfig) throws
IOException {
if (outputFolder == null) {
logger.warn("outputFolder for hfile split file is null, skip inner
region split");
@@ -300,7 +339,8 @@ protected static void saveHFileSplits(final
List<HashMap<Long, Double>> innerReg
logger.info(String.format("Region %d's hfile %d size is
%.2f mb", i, j, accumulatedSize));
byte[] split = new
byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN];
BytesUtil.writeUnsigned(i, split, 0,
RowConstants.ROWKEY_SHARDID_LEN);
- System.arraycopy(Bytes.toBytes(cuboid), 0, split,
RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+ System.arraycopy(Bytes.toBytes(cuboid), 0, split,
RowConstants.ROWKEY_SHARDID_LEN,
+ RowConstants.ROWKEY_CUBOIDID_LEN);
splits.add(split);
accumulatedSize = 0;
j++;
@@ -310,11 +350,15 @@ protected static void saveHFileSplits(final
List<HashMap<Long, Double>> innerReg
}
- SequenceFile.Writer hfilePartitionWriter =
SequenceFile.createWriter(hbaseConf,
SequenceFile.Writer.file(hfilePartitionFile),
SequenceFile.Writer.keyClass(RowKeyWritable.class),
SequenceFile.Writer.valueClass(NullWritable.class));
+ SequenceFile.Writer hfilePartitionWriter =
SequenceFile.createWriter(hbaseConf,
+ SequenceFile.Writer.file(hfilePartitionFile),
SequenceFile.Writer.keyClass(RowKeyWritable.class),
+ SequenceFile.Writer.valueClass(NullWritable.class));
for (int i = 0; i < splits.size(); i++) {
//when we compare the rowkey, we compare the row firstly.
- hfilePartitionWriter.append(new
RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
NullWritable.get());
+ hfilePartitionWriter.append(
+ new
RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
+ NullWritable.get());
}
hfilePartitionWriter.close();
}
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index 4fda1398cd..dfe7d0cefd 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -59,8 +59,10 @@ public HadoopShellExecutable createCreateHTableStep(String
jobId, CuboidModeEnum
StringBuilder cmd = new StringBuilder();
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME,
seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID,
seg.getUuid());
- appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION,
getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION,
+ getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE,
cuboidMode.toString());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_DICT_PATH,
getJobWorkingDir(jobId) + "/hbase-conf.xml");
createHtableStep.setJobParams(cmd.toString());
createHtableStep.setJobClass(CreateHTableJob.class);
@@ -69,7 +71,8 @@ public HadoopShellExecutable createCreateHTableStep(String
jobId, CuboidModeEnum
}
// TODO make it abstract
- public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg,
List<CubeSegment> mergingSegments, String jobID, Class<? extends
AbstractHadoopJob> clazz) {
+ public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg,
List<CubeSegment> mergingSegments,
+ String jobID, Class<? extends AbstractHadoopJob> clazz) {
final List<String> mergingCuboidPaths = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
@@ -86,7 +89,8 @@ public MapReduceExecutable
createMergeCuboidDataStep(CubeSegment seg, List<CubeS
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID,
seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
- appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
"Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+ "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() +
"_Step");
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
mergeCuboidDataStep.setMapReduceJobClass(clazz);
@@ -148,8 +152,10 @@ public MergeGCStep createOptimizeGCStep() {
}
public List<String> getMergingHTables() {
- final List<CubeSegment> mergingSegments = ((CubeInstance)
seg.getRealization()).getMergingSegments((CubeSegment) seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be
more than 2 segments to merge, target segment " + seg);
+ final List<CubeSegment> mergingSegments = ((CubeInstance)
seg.getRealization())
+ .getMergingSegments((CubeSegment) seg);
+ Preconditions.checkState(mergingSegments.size() > 1,
+ "there should be more than 2 segments to merge, target segment
" + seg);
final List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingHTables.add(merging.getStorageLocationIdentifier());
@@ -158,8 +164,10 @@ public MergeGCStep createOptimizeGCStep() {
}
public List<String> getMergingHDFSPaths() {
- final List<CubeSegment> mergingSegments = ((CubeInstance)
seg.getRealization()).getMergingSegments((CubeSegment) seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be
more than 2 segments to merge, target segment " + seg);
+ final List<CubeSegment> mergingSegments = ((CubeInstance)
seg.getRealization())
+ .getMergingSegments((CubeSegment) seg);
+ Preconditions.checkState(mergingSegments.size() > 1,
+ "there should be more than 2 segments to merge, target segment
" + seg);
final List<String> mergingHDFSPaths = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
@@ -180,11 +188,13 @@ public MergeGCStep createOptimizeGCStep() {
}
public String getHFilePath(String jobId) {
- return
HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" +
seg.getRealization().getName() + "/hfile/");
+ return HBaseConnection.makeQualifiedPathInHBaseCluster(
+ getJobWorkingDir(jobId) + "/" + seg.getRealization().getName()
+ "/hfile/");
}
public String getRowkeyDistributionOutputPath(String jobId) {
- return
HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" +
seg.getRealization().getName() + "/rowkey_stats");
+ return HBaseConnection.makeQualifiedPathInHBaseCluster(
+ getJobWorkingDir(jobId) + "/" + seg.getRealization().getName()
+ "/rowkey_stats");
}
public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable
jobFlow) {
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index 622a0e8891..e599bfce03 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -48,6 +48,7 @@ public AbstractExecutable
createConvertCuboidToHfileStep(String jobId) {
sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(),
getHFilePath(jobId));
sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(),
getRowkeyDistributionOutputPath(jobId) +
"/part-r-00000_hfile");
+ sparkExecutable.setParam(SparkCubeHFile.OPTION_WORKING_PATH.getOpt(),
getJobWorkingDir(jobId));
sparkExecutable.setJobId(jobId);
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index fd8459fd51..32a4cd4119 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -17,7 +17,6 @@
*/
package org.apache.kylin.storage.hbase.steps;
-import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -30,12 +29,11 @@
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -58,7 +56,6 @@
import org.apache.kylin.engine.spark.KylinSparkJobListener;
import org.apache.kylin.engine.spark.SparkUtil;
import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@@ -91,6 +88,8 @@
.isRequired(true).withDescription("Cuboid files
PATH").create(BatchConstants.ARG_INPUT);
public static final Option OPTION_PARTITION_FILE_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
.hasArg().isRequired(true).withDescription("Partition file
path.").create(BatchConstants.ARG_PARTITION);
+ public static final Option OPTION_WORKING_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg()
+ .isRequired(true).withDescription("Job working
path").create(BatchConstants.ARG_DICT_PATH);
private Options options;
@@ -102,6 +101,7 @@ public SparkCubeHFile() {
options.addOption(OPTION_META_URL);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_PARTITION_FILE_PATH);
+ options.addOption(OPTION_WORKING_PATH);
}
@Override
@@ -117,6 +117,7 @@ protected void execute(OptionsHelper optionsHelper) throws
Exception {
final String segmentId =
optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
final String outputPath =
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
final Path partitionFilePath = new
Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
+ final String workingPath =
optionsHelper.getOptionValue(OPTION_WORKING_PATH);
Class[] kryoClassArray = new Class[] {
Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class,
KeyValue.class, RowKeyWritable.class };
@@ -171,17 +172,15 @@ protected void execute(OptionsHelper optionsHelper)
throws Exception {
}
logger.info("There are " + keys.size() + " split keys, totally " +
(keys.size() + 1) + " hfiles");
- Configuration hbaseConf =
HBaseConnection.getCurrentHBaseConfiguration();
- HadoopUtil.healSickConfig(hbaseConf);
- Job job = new Job(hbaseConf,
cubeSegment.getStorageLocationIdentifier());
- job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); //
HFile, replication=3
- HTable table = new HTable(hbaseConf,
cubeSegment.getStorageLocationIdentifier());
- try {
- HFileOutputFormat2.configureIncrementalLoadMap(job, table);
- } catch (IOException ioe) {
- // this can be ignored.
- logger.debug(ioe.getMessage(), ioe);
- }
+
+ //HBase conf
+ String hbasePath = workingPath + "/hbase-conf.xml";
+ FSDataInputStream confInput = fs.open(new Path(hbasePath));
+ logger.info("Loading HBase configuration from:" + hbasePath);
+
+ Configuration hbaseJobConf = new Configuration();
+ hbaseJobConf.addResource(confInput);
+ Job job = new Job(hbaseJobConf,
cubeSegment.getStorageLocationIdentifier());
FileOutputFormat.setOutputPath(job, new Path(outputPath));
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Convert to HFile in spark reports ZK connection refused
> -------------------------------------------------------
>
> Key: KYLIN-3446
> URL: https://issues.apache.org/jira/browse/KYLIN-3446
> Project: Kylin
> Issue Type: Bug
> Components: Spark Engine
> Reporter: Shaofeng SHI
> Assignee: Yichen Zhou
> Priority: Major
> Fix For: v2.5.0
>
>
> {code:java}
> to server localhost/127.0.0.1:2181. Will not attempt to authenticate using
> SASL (unknown error)
> 2018-07-12 18:51:21,001 INFO [Scheduler 1109292714 Job
> 62f42193-20ff-4ca9-b898-52978a473bce-864] spark.SparkExecutable:38 : 18/07/12
> 18:51:21 WARN zookeeper.ClientCnxn: Session 0x0 for server null, unexpected
> error, closing socket connection and attempting reconnect
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job
> 62f42193-20ff-4ca9-b898-52978a473bce-864] spark.SparkExecutable:38 :
> java.net.ConnectException: Connection refused
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job
> 62f42193-20ff-4ca9-b898-52978a473bce-864] spark.SparkExecutable:38 : at
> sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job
> 62f42193-20ff-4ca9-b898-52978a473bce-864] spark.SparkExecutable:38 : at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job
> 62f42193-20ff-4ca9-b898-52978a473bce-864] spark.SparkExecutable:38 : at
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job
> 62f42193-20ff-4ca9-b898-52978a473bce-864] spark.SparkExecutable:38 : at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)