[
https://issues.apache.org/jira/browse/HIVE-23956?focusedWorklogId=465743&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-465743
]
ASF GitHub Bot logged work on HIVE-23956:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Aug/20 15:09
Start Date: 03/Aug/20 15:09
Worklog Time Spent: 10m
Work Description: pvargacl commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464476338
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +126,217 @@
*/
private long visibilityTxnId;
+ private List<DeltaFileMetaData> deltaFiles;
+
public DeltaMetaData() {
- this(0,0,new ArrayList<Integer>(), 0);
+ this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
}
+
/**
+ * @param minWriteId min writeId of the delta directory
+ * @param maxWriteId max writeId of the delta directory
* @param stmtIds delta dir suffixes when a single txn writes > 1 delta in
the same partition
* @param visibilityTxnId maybe 0, if the dir name didn't have it.
txnid:0 is always visible
+ * @param deltaFiles bucketFiles in the directory
*/
- DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds,
long visibilityTxnId) {
+ public DeltaMetaData(long minWriteId, long maxWriteId, List<Integer>
stmtIds, long visibilityTxnId,
+ List<DeltaFileMetaData> deltaFiles) {
this.minWriteId = minWriteId;
this.maxWriteId = maxWriteId;
if (stmtIds == null) {
throw new IllegalArgumentException("stmtIds == null");
}
this.stmtIds = stmtIds;
this.visibilityTxnId = visibilityTxnId;
+ this.deltaFiles = ObjectUtils.defaultIfNull(deltaFiles, new
ArrayList<>());
}
- long getMinWriteId() {
+
+ public long getMinWriteId() {
return minWriteId;
}
- long getMaxWriteId() {
+
+ public long getMaxWriteId() {
return maxWriteId;
}
- List<Integer> getStmtIds() {
+
+ public List<Integer> getStmtIds() {
return stmtIds;
}
- long getVisibilityTxnId() {
+
+ public long getVisibilityTxnId() {
return visibilityTxnId;
}
+
+ public List<DeltaFileMetaData> getDeltaFiles() {
+ return deltaFiles;
+ }
+
+ public List<DeltaFileMetaData> getDeltaFilesForStmtId(final Integer
stmtId) {
+ if (stmtIds.size() <= 1 || stmtId == null) {
+ // If it is not a multistatement delta, we do not store the stmtId in
the file list
+ return deltaFiles;
+ } else {
+ return deltaFiles.stream().filter(df ->
stmtId.equals(df.getStmtId())).collect(Collectors.toList());
+ }
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(minWriteId);
out.writeLong(maxWriteId);
out.writeInt(stmtIds.size());
- for(Integer id : stmtIds) {
+ for (Integer id : stmtIds) {
out.writeInt(id);
}
out.writeLong(visibilityTxnId);
+ out.writeInt(deltaFiles.size());
+ for (DeltaFileMetaData fileMeta : deltaFiles) {
+ fileMeta.write(out);
+ }
}
+
@Override
public void readFields(DataInput in) throws IOException {
minWriteId = in.readLong();
maxWriteId = in.readLong();
stmtIds.clear();
int numStatements = in.readInt();
- for(int i = 0; i < numStatements; i++) {
+ for (int i = 0; i < numStatements; i++) {
stmtIds.add(in.readInt());
}
visibilityTxnId = in.readLong();
+
+ deltaFiles.clear();
+ int numFiles = in.readInt();
+ for (int i = 0; i < numFiles; i++) {
+ DeltaFileMetaData file = new DeltaFileMetaData();
+ file.readFields(in);
+ deltaFiles.add(file);
+ }
}
- String getName() {
+
+ private String getName() {
assert stmtIds.isEmpty() : "use getName(int)";
- return AcidUtils.addVisibilitySuffix(AcidUtils
- .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
+ return
AcidUtils.addVisibilitySuffix(AcidUtils.deleteDeltaSubdir(minWriteId,
maxWriteId), visibilityTxnId);
}
- String getName(int stmtId) {
+
+ private String getName(int stmtId) {
assert !stmtIds.isEmpty() : "use getName()";
return AcidUtils.addVisibilitySuffix(AcidUtils
.deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
}
+
+ public List<Pair<Path, Integer>> getPaths(Path root) {
+ if (stmtIds.isEmpty()) {
+ return Collections.singletonList(new ImmutablePair<>(new Path(root,
getName()), null));
+ } else {
+ // To support multistatement transactions we may have multiple
directories corresponding to one DeltaMetaData
+ return getStmtIds().stream()
+ .map(stmtId -> new ImmutablePair<>(new Path(root,
getName(stmtId)), stmtId)).collect(Collectors.toList());
+ }
+ }
+
@Override
public String toString() {
return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + ","
+ visibilityTxnId + ")";
}
}
+
+ final class DeltaFileMetaData implements Writable {
+ private static final int HAS_LONG_FILEID_FLAG = 1;
+ private static final int HAS_ATTEMPTID_FLAG = 2;
+ private static final int HAS_STMTID_FLAG = 4;
+
+ private long modTime;
+ private long length;
+ // Optional
+ private Integer attemptId;
+ // Optional
+ private Long fileId;
+ // Optional, if the deltaMeta contains multiple stmtIds, it will contain
this files parent's stmtId
+ private Integer stmtId;
+
+ public DeltaFileMetaData() {
+ }
+
+ public DeltaFileMetaData(HadoopShims.HdfsFileStatusWithId fileStatus,
Integer stmtId) {
+ modTime = fileStatus.getFileStatus().getModificationTime();
+ length = fileStatus.getFileStatus().getLen();
+ String attempt =
AcidUtils.parseAttemptId(fileStatus.getFileStatus().getPath());
+ attemptId = StringUtils.isEmpty(attempt) ? null :
Integer.parseInt(attempt);
+ fileId = fileStatus.getFileId();
+ this.stmtId = stmtId;
+ }
+
+ public DeltaFileMetaData(long modTime, long length, @Nullable Integer
attemptId, @Nullable Long fileId,
+ @Nullable Integer stmtId) {
+ this.modTime = modTime;
+ this.length = length;
+ this.attemptId = attemptId;
+ this.fileId = fileId;
+ this.stmtId = stmtId;
+ }
+
+ public void clearStmtId() {
+ stmtId = null;
+ }
+
+ public Integer getStmtId() {
+ return stmtId;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ int flags = (fileId != null ? HAS_LONG_FILEID_FLAG : 0) |
+ (attemptId != null ? HAS_ATTEMPTID_FLAG : 0) |
+ (stmtId != null ? HAS_STMTID_FLAG : 0);
+ out.writeByte(flags);
+ out.writeLong(modTime);
+ out.writeLong(length);
+ if (attemptId != null) {
+ out.writeInt(attemptId);
+ }
+ if (fileId != null) {
+ out.writeLong(fileId);
+ }
+ if (stmtId != null) {
+ out.writeInt(stmtId);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ byte flags = in.readByte();
+ boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0,
+ hasAttemptId = (HAS_ATTEMPTID_FLAG & flags) != 0,
+ hasStmtId = (HAS_STMTID_FLAG & flags) != 0;
+ modTime = in.readLong();
+ length = in.readLong();
+ if (hasAttemptId) {
+ attemptId = in.readInt();
+ }
+ if (hasLongFileId) {
+ fileId = in.readLong();
+ }
+ if (hasStmtId) {
+ stmtId = in.readInt();
+ }
+ }
+
+ public Object getFileId(Path deltaDirectory, int bucketId) {
+ if (fileId != null) {
+ return fileId;
+ }
+ // Calculate the synthetic fileid
+ Path realPath = getPath(deltaDirectory, bucketId);
+ return new SyntheticFileId(realPath, length, modTime);
+ }
Review comment:
I will add the forceSynthetic parameter, it has valid use-casee
(https://issues.apache.org/jira/browse/HIVE-20338) but I have a problem with
this: boolean allowSynthetic = HiveConf.getBoolVar(daemonConf,
ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID);
If someone disables this, it will render the llap cache useless, even more,
your orctailcache will just throw an IllegalCacheConfigurationException
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 465743)
Time Spent: 4h 20m (was: 4h 10m)
> Delete delta directory file information should be pushed to execution side
> --------------------------------------------------------------------------
>
> Key: HIVE-23956
> URL: https://issues.apache.org/jira/browse/HIVE-23956
> Project: Hive
> Issue Type: Improvement
> Reporter: Peter Varga
> Assignee: Peter Varga
> Priority: Major
> Labels: pull-request-available
> Time Spent: 4h 20m
> Remaining Estimate: 0h
>
> Since HIVE-23840 LLAP cache is used to retrieve the tail of the ORC bucket
> files in the delete deltas, but to use the cache the fileId must be
> determined, so one more FileSystem call is issued for each bucket.
> This fileId is already available during compilation in the AcidState
> calculation, we should serialise this to the OrcSplit, and remove the
> unnecessary FS calls.
> Furthermore instead of sending the SyntheticFileId directly, we should pass
> the attemptId instead of the standard path hash, this way the path and the
> SyntheticFileId. can be calculated, and it will work even, if the move free
> delete operations will be introduced.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)