This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit cddb493884e624c12c3452bad4de7482fc37de8d
Author: chao long <wayn...@qq.com>
AuthorDate: Mon Dec 10 13:05:27 2018 +0800

    KYLIN-3680 Spark cubing failed with JDBC data source
---
 .../org/apache/kylin/common/util/HadoopUtil.java   | 24 +++++++++
 .../org/apache/kylin/engine/spark/SparkUtil.java   | 63 ++++++++++++----------
 2 files changed, 60 insertions(+), 27 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index f3123a2..5d09ea7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -236,4 +236,28 @@ public class HadoopUtil {
         return readFromSequenceFile(getCurrentConfiguration(), inputPath);
     }
 
+    public static boolean isSequenceFile(Configuration conf, Path filePath) {
+        try (SequenceFile.Reader reader = new 
SequenceFile.Reader(getWorkingFileSystem(conf), filePath, conf)) {
+            return true;
+        } catch (Exception e) {
+            logger.warn("Read sequence file {} failed.", filePath.getName(), 
e);
+            return false;
+        }
+    }
+
+    public static boolean isSequenceDir(Configuration conf, Path fileDir) 
throws IOException {
+        FileSystem fs = getWorkingFileSystem(conf);
+        FileStatus[] fileStatuses = fs.listStatus(fileDir, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return !"_SUCCESS".equals(path.getName());
+            }
+        });
+
+        if (fileStatuses != null && fileStatuses.length > 0) {
+            return isSequenceFile(conf, fileStatuses[0].getPath());
+        }
+
+        return false;
+    }
 }
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 1c4086d..151103a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
@@ -142,39 +143,47 @@ public class SparkUtil {
         
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec",
 "org.apache.hadoop.io.compress.DefaultCodec"); // or 
org.apache.hadoop.io.compress.SnappyCodec
     }
 
-    public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, 
JavaSparkContext sc, String inputPath, String hiveTable) {
+    public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, 
JavaSparkContext sc, String inputPath, String hiveTable) throws IOException {
         JavaRDD<String[]> recordRDD;
 
-        if (isSequenceFile) {
-            recordRDD = sc.sequenceFile(inputPath, BytesWritable.class, 
Text.class).values()
-                    .map(new Function<Text, String[]>() {
-                        @Override
-                        public String[] call(Text text) throws Exception {
-                            String s = Bytes.toString(text.getBytes(), 0, 
text.getLength());
-                            return 
s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
-                        }
-                    });
+        if (isSequenceFile && 
HadoopUtil.isSequenceDir(sc.hadoopConfiguration(), new Path(inputPath))) {
+            recordRDD = getSequenceFormatHiveInput(sc, inputPath);
         } else {
-            SparkSession sparkSession = 
SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
-            final Dataset intermediateTable = sparkSession.table(hiveTable);
-            recordRDD = intermediateTable.javaRDD().map(new Function<Row, 
String[]>() {
-                @Override
-                public String[] call(Row row) throws Exception {
-                    String[] result = new String[row.size()];
-                    for (int i = 0; i < row.size(); i++) {
-                        final Object o = row.get(i);
-                        if (o != null) {
-                            result[i] = o.toString();
-                        } else {
-                            result[i] = null;
-                        }
-                    }
-                    return result;
-                }
-            });
+            recordRDD = getOtherFormatHiveInput(sc, hiveTable);
         }
 
         return recordRDD;
     }
 
+    private static JavaRDD<String[]> 
getSequenceFormatHiveInput(JavaSparkContext sc, String inputPath) {
+        return sc.sequenceFile(inputPath, BytesWritable.class, 
Text.class).values()
+                .map(new Function<Text, String[]>() {
+                    @Override
+                    public String[] call(Text text) throws Exception {
+                        String s = Bytes.toString(text.getBytes(), 0, 
text.getLength());
+                        return 
s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+                    }
+                });
+    }
+
+    private static JavaRDD<String[]> getOtherFormatHiveInput(JavaSparkContext 
sc, String hiveTable) {
+        SparkSession sparkSession = 
SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
+        final Dataset intermediateTable = sparkSession.table(hiveTable);
+        return intermediateTable.javaRDD().map(new Function<Row, String[]>() {
+            @Override
+            public String[] call(Row row) throws Exception {
+                String[] result = new String[row.size()];
+                for (int i = 0; i < row.size(); i++) {
+                    final Object o = row.get(i);
+                    if (o != null) {
+                        result[i] = o.toString();
+                    } else {
+                        result[i] = null;
+                    }
+                }
+                return result;
+            }
+        });
+    }
+
 }

Reply via email to