This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 14a2a661061 [fix](paimon) fix not able to read paimon data from hdfs
with HA (#39806) (#39876)
14a2a661061 is described below
commit 14a2a661061a471972375e4d1256bd8bfdaa48c6
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Aug 24 17:51:15 2024 +0800
[fix](paimon) fix not able to read paimon data from hdfs with HA (#39806)
(#39876)
bp #39806
---
be/src/vec/exec/format/table/paimon_jni_reader.cpp | 8 +++++++-
be/src/vec/exec/format/table/paimon_jni_reader.h | 1 +
.../org/apache/doris/paimon/PaimonJniScanner.java | 13 +++++++++++--
.../org/apache/doris/paimon/PaimonTableCache.java | 22 ++++++++++++++++++----
.../datasource/paimon/source/PaimonScanNode.java | 2 ++
gensrc/thrift/PlanNodes.thrift | 1 +
6 files changed, 40 insertions(+), 7 deletions(-)
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index ef690c15b68..fa73454f4b4 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -35,7 +35,8 @@ class Block;
namespace doris::vectorized {
-const std::string PaimonJniReader::PAIMON_OPTION_PREFIX =
"paimon_option_prefix.";
+const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon.";
+const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop.";
PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
@@ -65,6 +66,11 @@ PaimonJniReader::PaimonJniReader(const
std::vector<SlotDescriptor*>& file_slot_d
for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
}
+ if (range.table_format_params.paimon_params.__isset.hadoop_conf) {
+ for (auto& kv : range.table_format_params.paimon_params.hadoop_conf) {
+ params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
+ }
+ }
_jni_connector =
std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner",
params, column_names);
}
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h
b/be/src/vec/exec/format/table/paimon_jni_reader.h
index 6b6a6907270..6ecf6cd1f15 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.h
@@ -51,6 +51,7 @@ class PaimonJniReader : public JniReader {
public:
static const std::string PAIMON_OPTION_PREFIX;
+ static const std::string HADOOP_OPTION_PREFIX;
PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state,
RuntimeProfile* profile, const TFileRangeDesc& range);
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 719a7ea0b9d..4ef40d9fa1a 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -42,9 +42,12 @@ import java.util.stream.Collectors;
public class PaimonJniScanner extends JniScanner {
private static final Logger LOG =
LoggerFactory.getLogger(PaimonJniScanner.class);
- private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix.";
+ private static final String PAIMON_OPTION_PREFIX = "paimon.";
+ private static final String HADOOP_OPTION_PREFIX = "hadoop.";
+
private final Map<String, String> params;
private final Map<String, String> paimonOptionParams;
+ private final Map<String, String> hadoopOptionParams;
private final String dbName;
private final String tblName;
private final String paimonSplit;
@@ -87,6 +90,10 @@ public class PaimonJniScanner extends JniScanner {
.filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
.collect(Collectors
.toMap(kv1 ->
kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
+ hadoopOptionParams = params.entrySet().stream()
+ .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
+ .collect(Collectors
+ .toMap(kv1 ->
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
}
@Override
@@ -207,7 +214,8 @@ public class PaimonJniScanner extends JniScanner {
}
private void initTable() {
- PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId,
paimonOptionParams, dbName, tblName);
+ PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId,
+ paimonOptionParams, hadoopOptionParams, dbName, tblName);
TableExt tableExt = PaimonTableCache.getTable(key);
if (tableExt.getCreateTime() < lastUpdateTime) {
LOG.warn("invalidate cache table:{}, localTime:{}, remoteTime:{}",
key, tableExt.getCreateTime(),
@@ -223,3 +231,4 @@ public class PaimonJniScanner extends JniScanner {
}
}
+
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
index f57ffeb5592..12aac153392 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
@@ -21,6 +21,7 @@ import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -54,7 +55,7 @@ public class PaimonTableCache {
private static TableExt loadTable(PaimonTableCacheKey key) {
try {
LOG.warn("load table:{}", key);
- Catalog catalog = createCatalog(key.getPaimonOptionParams());
+ Catalog catalog = createCatalog(key.getPaimonOptionParams(),
key.getHadoopOptionParams());
Table table = catalog.getTable(Identifier.create(key.getDbName(),
key.getTblName()));
return new TableExt(table, System.currentTimeMillis());
} catch (Catalog.TableNotExistException e) {
@@ -63,10 +64,14 @@ public class PaimonTableCache {
}
}
- private static Catalog createCatalog(Map<String, String>
paimonOptionParams) {
+ private static Catalog createCatalog(
+ Map<String, String> paimonOptionParams,
+ Map<String, String> hadoopOptionParams) {
Options options = new Options();
paimonOptionParams.entrySet().stream().forEach(kv ->
options.set(kv.getKey(), kv.getValue()));
- CatalogContext context = CatalogContext.create(options);
+ Configuration hadoopConf = new Configuration();
+ hadoopOptionParams.entrySet().stream().forEach(kv ->
hadoopConf.set(kv.getKey(), kv.getValue()));
+ CatalogContext context = CatalogContext.create(options, hadoopConf);
return CatalogFactory.createCatalog(context);
}
@@ -108,15 +113,19 @@ public class PaimonTableCache {
// not in key
private Map<String, String> paimonOptionParams;
+ private Map<String, String> hadoopOptionParams;
private String dbName;
private String tblName;
public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
- Map<String, String> paimonOptionParams, String dbName, String
tblName) {
+ Map<String, String> paimonOptionParams,
+ Map<String, String> hadoopOptionParams,
+ String dbName, String tblName) {
this.ctlId = ctlId;
this.dbId = dbId;
this.tblId = tblId;
this.paimonOptionParams = paimonOptionParams;
+ this.hadoopOptionParams = hadoopOptionParams;
this.dbName = dbName;
this.tblName = tblName;
}
@@ -137,6 +146,10 @@ public class PaimonTableCache {
return paimonOptionParams;
}
+ public Map<String, String> getHadoopOptionParams() {
+ return hadoopOptionParams;
+ }
+
public String getDbName() {
return dbName;
}
@@ -171,6 +184,7 @@ public class PaimonTableCache {
+ ", dbId=" + dbId
+ ", tblId=" + tblId
+ ", paimonOptionParams=" + paimonOptionParams
+ + ", hadoopOptionParams=" + hadoopOptionParams
+ ", dbName='" + dbName + '\''
+ ", tblName='" + tblName + '\''
+ '}';
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 27b40b5bcc9..ad8017a4f4c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -164,6 +164,8 @@ public class PaimonScanNode extends FileQueryScanNode {
fileDesc.setDbId(((PaimonExternalTable)
source.getTargetTable()).getDbId());
fileDesc.setTblId(source.getTargetTable().getId());
fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
+ // The hadoop conf should be same with
PaimonExternalCatalog.createCatalog()#getConfiguration()
+
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
if (optDeletionFile.isPresent()) {
DeletionFile deletionFile = optDeletionFile.get();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index fc1a6e6baf5..a060f5efab4 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -327,6 +327,7 @@ struct TPaimonFileDesc {
10: optional i64 last_update_time
11: optional string file_format
12: optional TPaimonDeletionFileDesc deletion_file;
+ 13: optional map<string, string> hadoop_conf
}
struct TMaxComputeFileDesc {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]