This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit cddb493884e624c12c3452bad4de7482fc37de8d Author: chao long <wayn...@qq.com> AuthorDate: Mon Dec 10 13:05:27 2018 +0800 KYLIN-3680 Spark cubing failed with JDBC data source --- .../org/apache/kylin/common/util/HadoopUtil.java | 24 +++++++++ .../org/apache/kylin/engine/spark/SparkUtil.java | 63 ++++++++++++---------- 2 files changed, 60 insertions(+), 27 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index f3123a2..5d09ea7 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -236,4 +236,28 @@ public class HadoopUtil { return readFromSequenceFile(getCurrentConfiguration(), inputPath); } + public static boolean isSequenceFile(Configuration conf, Path filePath) { + try (SequenceFile.Reader reader = new SequenceFile.Reader(getWorkingFileSystem(conf), filePath, conf)) { + return true; + } catch (Exception e) { + logger.warn("Read sequence file {} failed.", filePath.getName(), e); + return false; + } + } + + public static boolean isSequenceDir(Configuration conf, Path fileDir) throws IOException { + FileSystem fs = getWorkingFileSystem(conf); + FileStatus[] fileStatuses = fs.listStatus(fileDir, new PathFilter() { + @Override + public boolean accept(Path path) { + return !"_SUCCESS".equals(path.getName()); + } + }); + + if (fileStatuses != null && fileStatuses.length > 0) { + return isSequenceFile(conf, fileStatuses[0].getPath()); + } + + return false; + } } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java index 1c4086d..151103a 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.EngineFactory; @@ -142,39 +143,47 @@ public class SparkUtil { sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec } - public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, JavaSparkContext sc, String inputPath, String hiveTable) { + public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, JavaSparkContext sc, String inputPath, String hiveTable) throws IOException { JavaRDD<String[]> recordRDD; - if (isSequenceFile) { - recordRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values() - .map(new Function<Text, String[]>() { - @Override - public String[] call(Text text) throws Exception { - String s = Bytes.toString(text.getBytes(), 0, text.getLength()); - return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER); - } - }); + if (isSequenceFile && HadoopUtil.isSequenceDir(sc.hadoopConfiguration(), new Path(inputPath))) { + recordRDD = getSequenceFormatHiveInput(sc, inputPath); } else { - SparkSession sparkSession = SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate(); - final Dataset intermediateTable = sparkSession.table(hiveTable); - recordRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() { - @Override - public String[] call(Row row) throws Exception { - String[] result = new String[row.size()]; - for (int i = 0; i < row.size(); i++) { - final Object o = row.get(i); - if (o != null) { - result[i] = o.toString(); - } else { - result[i] = null; - } - } - return result; - } - }); + recordRDD = getOtherFormatHiveInput(sc, hiveTable); } return recordRDD; } + private static JavaRDD<String[]> getSequenceFormatHiveInput(JavaSparkContext sc, String inputPath) { + return sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values() + .map(new Function<Text, String[]>() { + @Override + public String[] call(Text text) throws Exception { + String s = Bytes.toString(text.getBytes(), 0, text.getLength()); + return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER); + } + }); + } + + private static JavaRDD<String[]> getOtherFormatHiveInput(JavaSparkContext sc, String hiveTable) { + SparkSession sparkSession = SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate(); + final Dataset intermediateTable = sparkSession.table(hiveTable); + return intermediateTable.javaRDD().map(new Function<Row, String[]>() { + @Override + public String[] call(Row row) throws Exception { + String[] result = new String[row.size()]; + for (int i = 0; i < row.size(); i++) { + final Object o = row.get(i); + if (o != null) { + result[i] = o.toString(); + } else { + result[i] = null; + } + } + return result; + } + }); + } + }