deniskuzZ commented on a change in pull request #1884:
URL: https://github.com/apache/hive/pull/1884#discussion_r599343997
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
##########
@@ -1117,38 +1104,93 @@ public Options clone() {
}
baseReader = pair.getRecordReader();
}
- /*now process the delta files. For normal read these should only be
delete deltas. For
- * Compaction these may be any delta_x_y/. The files inside any delta_x_y/
may be in Acid
- * format (i.e. with Acid metadata columns) or 'original'.*/
+
+ processDeltaDirs(conf, bucket, options, deltaDirectory, mergerOptions,
deltasToAttemptId, eventOptions);
+
+ // get the first record
+ LOG.debug("Final reader map {}", readers);
+ Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();
+ if (entry == null) {
+ columns = 0;
+ primary = null;
+ } else {
+ primary = entry.getValue();
+ if (readers.isEmpty()) {
+ secondaryKey = null;
+ } else {
+ secondaryKey = readers.firstKey();
+ }
+ // get the number of columns in the user's rows
+ columns = primary.getColumns();
+ }
+ }
+
+ private ReaderPair createOriginalReaderPair(Configuration conf, Reader
reader, int bucket,
+ org.apache.orc.Reader.Options options, OrcRawRecordMerger.Options
mergerOptions,
+ OrcRawRecordMerger.KeyInterval keyInterval, ReaderKey baseKey) throws
IOException {
+ ReaderPair pair;
+ if (mergerOptions.isCompacting()) {
+ assert mergerOptions.isMajorCompaction();
+ Options readerPairOptions = mergerOptions;
+
if(mergerOptions.getBaseDir().getName().startsWith(AcidUtils.BASE_PREFIX)) {
+ readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+
AcidUtils.ParsedBaseLight.parseBase(mergerOptions.getBaseDir()).getWriteId(),
+ mergerOptions.getBaseDir());
+ }
+ pair = new OriginalReaderPairToCompact(baseKey, bucket, options,
readerPairOptions,
+ conf, validWriteIdList,
+ 0); // 0 since base_x doesn't have a suffix (neither does pre acid
write)
+ } else {
+ assert mergerOptions.getBucketPath() != null : " since this is not
compaction: " + mergerOptions.getRootPath();
+ //if here it's a non-acid schema file - check if from before table was
marked transactional
+ //or in base_x/delta_x_x from Load Data
+ TransactionMetaData tfp = TransactionMetaData
+ .findWriteIDForSynthetcRowIDs(mergerOptions.getBucketPath(),
mergerOptions.getRootPath(), conf);
+ Options readerPairOptions = mergerOptions;
+ if (tfp.syntheticWriteId > 0) {
+ readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+ tfp.syntheticWriteId, tfp.folder);
+ }
+ pair = new OriginalReaderPairToRead(baseKey, reader, bucket,
keyInterval.getMinKey(),
+ keyInterval.getMaxKey(), options, readerPairOptions, conf,
validWriteIdList, tfp.statementId);
+ }
+ return pair;
+ }
+
+ private void processDeltaDirs(Configuration conf, int bucket,
org.apache.orc.Reader.Options options,
+ Path[] deltaDirectory, OrcRawRecordMerger.Options mergerOptions,
Map<String, Integer> deltasToAttemptId,
+ org.apache.orc.Reader.Options eventOptions) throws IOException {
Review comment:
is there some package clash?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]