This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch bug/hive in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 1b6ff2f481cab9692ab4a7037ca0eeda04656520 Author: JackieTien97 <[email protected]> AuthorDate: Mon Jul 13 12:38:03 2020 +0800 solve hive-connector bug --- .../apache/iotdb/hadoop/tsfile/TSFInputFormat.java | 23 ++++++++++++++-------- .../org/apache/iotdb/hive/TSFHiveInputFormat.java | 11 +++++++---- .../java/org/apache/iotdb/hive/TsFileSerDe.java | 16 +++++++++++---- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java index 4808af2..6df9a0e 100644 --- a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java +++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.hadoop.tsfile; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -73,7 +75,7 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> { /** * Set the deltaObjectIds which want to be read * - * @param job hadoop job + * @param job hadoop job * @param value the deltaObjectIds will be read * @throws TSFHadoopException */ @@ -95,8 +97,8 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> { * Get the deltaObjectIds which want to be read * * @param configuration - * @return List of device, if configuration has been set the deviceIds. - * null, if configuration has not been set the deviceIds. + * @return List of device, if configuration has been set the deviceIds. null, if configuration has + * not been set the deviceIds. */ public static List<String> getReadDeviceIds(Configuration configuration) { String deviceIds = configuration.get(READ_DELTAOBJECTS); @@ -111,7 +113,7 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> { /** * Set the measurementIds which want to be read * - * @param job hadoop job + * @param job hadoop job * @param value the measurementIds will be read * @throws TSFHadoopException */ @@ -236,11 +238,13 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> { @Override public List<InputSplit> getSplits(JobContext job) throws IOException { + job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, true); List<FileStatus> listFileStatus = super.listStatus(job); return new ArrayList<>(getTSFInputSplit(job.getConfiguration(), listFileStatus, logger)); } - public static List<TSFInputSplit> getTSFInputSplit(Configuration configuration, List<FileStatus> listFileStatus, Logger logger) throws IOException { + public static List<TSFInputSplit> getTSFInputSplit(Configuration configuration, + List<FileStatus> listFileStatus, Logger logger) throws IOException { BlockLocation[] blockLocations; List<TSFInputSplit> splits = new ArrayList<>(); // get the all file in the directory @@ -250,6 +254,9 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> { logger.info("The file path is {}", fileStatus.getPath()); // Get the file path Path path = fileStatus.getPath(); + if (!path.toString().endsWith(TSFILE_SUFFIX)) { + continue; + } // Get the file length long length = fileStatus.getLen(); // Check the file length. if the length is less than 0, return the @@ -273,8 +280,7 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> { } /** - * get the TSFInputSplit from tsfMetaData and hdfs block location - * information with the filter + * get the TSFInputSplit from tsfMetaData and hdfs block location information with the filter * * @throws IOException */ @@ -282,7 +288,8 @@ TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> { throws IOException { List<TSFInputSplit> splits = new ArrayList<>(); for (BlockLocation blockLocation : blockLocations) { - splits.add(new TSFInputSplit(path, blockLocation.getHosts(), blockLocation.getOffset(), blockLocation.getLength())); + splits.add(new TSFInputSplit(path, blockLocation.getHosts(), blockLocation.getOffset(), + blockLocation.getLength())); } return splits; } diff --git a/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java b/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java index bb2d5ca..cf7b3e0 100644 --- a/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java +++ b/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveInputFormat.java @@ -29,8 +29,8 @@ import java.io.IOException; import java.util.Arrays; /** - * The class implement is same as {@link org.apache.iotdb.hadoop.tsfile.TSFInputFormat} - * and is customized for Hive to implements JobConfigurable interface. + * The class implement is same as {@link org.apache.iotdb.hadoop.tsfile.TSFInputFormat} and is + * customized for Hive to implements JobConfigurable interface. */ public class TSFHiveInputFormat extends FileInputFormat<NullWritable, MapWritable> { @@ -39,13 +39,16 @@ public class TSFHiveInputFormat extends FileInputFormat<NullWritable, MapWritabl @Override - public RecordReader<NullWritable, MapWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + public RecordReader<NullWritable, MapWritable> getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { return new TSFHiveRecordReader(split, job); } @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - return TSFInputFormat.getTSFInputSplit(job, Arrays.asList(super.listStatus(job)), logger).toArray(new InputSplit[0]); + job.setBoolean(INPUT_DIR_RECURSIVE, true); + return TSFInputFormat.getTSFInputSplit(job, Arrays.asList(super.listStatus(job)), logger) + .toArray(new InputSplit[0]); } } diff --git a/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java b/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java index 9263f2a..35d94e9 100644 --- a/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java +++ b/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java @@ -18,6 +18,15 @@ */ package org.apache.iotdb.hive; +import static org.apache.iotdb.hadoop.tsfile.TSFInputFormat.READ_DELTAOBJECTS; +import static org.apache.iotdb.hadoop.tsfile.TSFInputFormat.READ_MEASUREMENTID; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.serde.serdeConstants; @@ -36,9 +45,6 @@ import org.apache.iotdb.hadoop.tsfile.record.HDFSTSRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.util.*; - public class TsFileSerDe extends AbstractSerDe { private static final Logger logger = LoggerFactory.getLogger(TsFileSerDe.class); @@ -63,7 +69,6 @@ public class TsFileSerDe extends AbstractSerDe { deviceId = tbl.getProperty(DEVICE_ID); - if (columnNameProperty == null || columnNameProperty.isEmpty() || columnTypeProperty == null || columnTypeProperty.isEmpty()) { columnNames = Collections.emptyList(); @@ -80,6 +85,9 @@ public class TsFileSerDe extends AbstractSerDe { throw new TsFileSerDeException("len(columnNames) != len(columnTypes)"); } + conf.set(READ_DELTAOBJECTS, deviceId); + conf.set(READ_MEASUREMENTID, columnNames.get(1)); + oi = createObjectInspector(); }
