This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 08353f60278 [Enhance](fe) Iceberg table in HMS catalog supports broker
scan (#28107)
08353f60278 is described below
commit 08353f6027890bf95338afbff95156c3f270023a
Author: DuRipeng <[email protected]>
AuthorDate: Wed Jan 3 11:29:12 2024 +0800
[Enhance](fe) Iceberg table in HMS catalog supports broker scan (#28107)
My organization uses HMS catalog to accelerate Lake query. Sine we have
custom distributed file system and hard to integrate to FE / BE, we introduce
HMS Catalog broker scan support (#24830) and implement custom distributed file
system adaption in broker.
We want to expand the scope of use to Iceberg table scan in HMS Catalog.
This PR introduces broker-scan-related `IcebergBrokerIO`, `BrokerInputFile`,
`BrokerInputStream` for Iceberg table scan
---
docs/en/docs/lakehouse/multi-catalog/hive.md | 7 +
docs/zh-CN/docs/lakehouse/multi-catalog/hive.md | 6 +
.../datasource/iceberg/broker/BrokerInputFile.java | 74 +++++++++
.../iceberg/broker/BrokerInputStream.java | 169 +++++++++++++++++++++
.../datasource/iceberg/broker/IcebergBrokerIO.java | 80 ++++++++++
.../doris/planner/external/FileQueryScanNode.java | 10 +-
.../external/iceberg/IcebergMetadataCache.java | 17 ++-
7 files changed, 355 insertions(+), 8 deletions(-)
diff --git a/docs/en/docs/lakehouse/multi-catalog/hive.md
b/docs/en/docs/lakehouse/multi-catalog/hive.md
index 25fddea1250..01c78d3599f 100644
--- a/docs/en/docs/lakehouse/multi-catalog/hive.md
+++ b/docs/en/docs/lakehouse/multi-catalog/hive.md
@@ -391,6 +391,13 @@ Add following setting when creating an HMS catalog, file
splitting and scanning
"broker.name" = "test_broker"
```
+
+Doris has implemented Broker query support for HMS Catalog Iceberg based on
the Iceberg `FileIO` interface. If needed, the following configuration can be
added when creating the HMS Catalog.
+
+```sql
+"io-impl" = "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO"
+```
+
## Integrate with Apache Ranger
Apache Ranger is a security framework for monitoring, enabling services, and
comprehensive data security access management on the Hadoop platform.
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md
b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md
index e75977c25f4..b4efb291f89 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md
@@ -373,6 +373,12 @@ CREATE CATALOG hive PROPERTIES (
"broker.name" = "test_broker"
```
+Doris 基于 Iceberg `FileIO` 接口实现了 Broker 查询 HMS Catalog Iceberg 的支持。如有需求,可以在创建
HMS Catalog 时增加如下配置。
+
+```sql
+"io-impl" = "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO"
+```
+
## 使用 Ranger 进行权限校验
Apache Ranger是一个用来在Hadoop平台上进行监控,启用服务,以及全方位数据安全访问管理的安全框架。
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java
new file mode 100644
index 00000000000..529df6c0af1
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.broker;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.common.util.BrokerReader;
+import org.apache.doris.thrift.TBrokerFD;
+
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+import java.io.IOException;
+
+public class BrokerInputFile implements InputFile {
+
+ private Long fileLength = null;
+
+ private final String filePath;
+ private final BrokerDesc brokerDesc;
+ private BrokerReader reader;
+ private TBrokerFD fd;
+
+ private BrokerInputFile(String filePath, BrokerDesc brokerDesc) {
+ this.filePath = filePath;
+ this.brokerDesc = brokerDesc;
+ }
+
+ private void init() throws IOException {
+ this.reader = BrokerReader.create(this.brokerDesc);
+ this.fileLength = this.reader.getFileLength(filePath);
+ this.fd = this.reader.open(filePath);
+ }
+
+ public static BrokerInputFile create(String filePath, BrokerDesc
brokerDesc) throws IOException {
+ BrokerInputFile inputFile = new BrokerInputFile(filePath, brokerDesc);
+ inputFile.init();
+ return inputFile;
+ }
+
+ @Override
+ public long getLength() {
+ return fileLength;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ return new BrokerInputStream(this.reader, this.fd, this.fileLength);
+ }
+
+ @Override
+ public String location() {
+ return filePath;
+ }
+
+ @Override
+ public boolean exists() {
+ return fileLength != null;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java
new file mode 100644
index 00000000000..b9f6d30aed4
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.broker;
+
+import org.apache.doris.common.util.BrokerReader;
+import org.apache.doris.thrift.TBrokerFD;
+
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+
+public class BrokerInputStream extends SeekableInputStream {
+ private static final Logger LOG =
LogManager.getLogger(BrokerInputStream.class);
+ private static final int COPY_BUFFER_SIZE = 1024 * 1024; // 1MB
+
+ private final byte[] tmpBuf = new byte[COPY_BUFFER_SIZE];
+ private long currentPos = 0;
+ private long markPos = 0;
+
+ private long bufferOffset = 0;
+ private long bufferLimit = 0;
+ private final BrokerReader reader;
+ private final TBrokerFD fd;
+ private final long fileLength;
+
+ public BrokerInputStream(BrokerReader reader, TBrokerFD fd, long
fileLength) {
+ this.fd = fd;
+ this.reader = reader;
+ this.fileLength = fileLength;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return currentPos;
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ currentPos = newPos;
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ if (currentPos < bufferOffset || currentPos > bufferLimit ||
bufferOffset >= bufferLimit) {
+ bufferOffset = currentPos;
+ fill();
+ }
+ if (currentPos > bufferLimit) {
+ LOG.warn("current pos {} is larger than buffer limit {}."
+ + " should not happen.", currentPos, bufferLimit);
+ return -1;
+ }
+
+ int pos = (int) (currentPos - bufferOffset);
+ int res = Byte.toUnsignedInt(tmpBuf[pos]);
+ ++currentPos;
+ return res;
+ } catch (BrokerReader.EOFException e) {
+ return -1;
+ }
+ }
+
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ byte[] data = reader.pread(fd, currentPos, b.length);
+ System.arraycopy(data, 0, b, 0, data.length);
+ currentPos += data.length;
+ return data.length;
+ } catch (BrokerReader.EOFException e) {
+ return -1;
+ }
+ }
+
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ try {
+ if (currentPos < bufferOffset || currentPos > bufferLimit ||
currentPos + len > bufferLimit) {
+ if (len > COPY_BUFFER_SIZE) {
+ // the data to be read is larger then max size of buffer.
+ // read it directly.
+ byte[] data = reader.pread(fd, currentPos, len);
+ System.arraycopy(data, 0, b, off, data.length);
+ currentPos += data.length;
+ return data.length;
+ }
+ // fill the buffer first
+ bufferOffset = currentPos;
+ fill();
+ }
+
+ if (currentPos > bufferLimit) {
+ LOG.warn("current pos {} is larger than buffer limit {}."
+ + " should not happen.", currentPos, bufferLimit);
+ return -1;
+ }
+
+ int start = (int) (currentPos - bufferOffset);
+ int readLen = Math.min(len, (int) (bufferLimit - bufferOffset));
+ System.arraycopy(tmpBuf, start, b, off, readLen);
+ currentPos += readLen;
+ return readLen;
+ } catch (BrokerReader.EOFException e) {
+ return -1;
+ }
+ }
+
+ private void fill() throws IOException, BrokerReader.EOFException {
+ if (bufferOffset == this.fileLength) {
+ throw new BrokerReader.EOFException();
+ }
+ byte[] data = reader.pread(fd, bufferOffset, COPY_BUFFER_SIZE);
+ System.arraycopy(data, 0, tmpBuf, 0, data.length);
+ bufferLimit = bufferOffset + data.length;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ final long left = fileLength - currentPos;
+ long min = Math.min(n, left);
+ currentPos += min;
+ return min;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close(fd);
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ markPos = currentPos;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ currentPos = markPos;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java
new file mode 100644
index 00000000000..ec3b4f17030
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.broker;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.datasource.HMSExternalCatalog;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableMap;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * FileIO implementation that uses broker to execute Iceberg files IO
operation.
+ */
+public class IcebergBrokerIO implements FileIO {
+
+ private SerializableMap<String, String> properties =
SerializableMap.copyOf(ImmutableMap.of());
+ private BrokerDesc brokerDesc = null;
+
+ @Override
+ public void initialize(Map<String, String> props) {
+ this.properties = SerializableMap.copyOf(props);
+ if (!properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+ throw new UnsupportedOperationException(String.format("No broker
is specified, "
+ + "try to set '%s' in HMS Catalog",
HMSExternalCatalog.BIND_BROKER_NAME));
+ }
+ String brokerName =
properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
+ this.brokerDesc = new BrokerDesc(brokerName,
properties.immutableMap());
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return properties.immutableMap();
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+ if (brokerDesc == null) {
+ throw new UnsupportedOperationException("IcebergBrokerIO should be
initialized first");
+ }
+ try {
+ return BrokerInputFile.create(path, brokerDesc);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ throw new UnsupportedOperationException("IcebergBrokerIO does not
support writing files");
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ throw new UnsupportedOperationException("IcebergBrokerIO does not
support deleting files");
+ }
+
+ @Override
+ public void close() { }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 9bae707c2f3..b54b05f47e1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -34,10 +34,12 @@ import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.iceberg.IcebergSplit;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -313,7 +315,13 @@ public abstract class FileQueryScanNode extends
FileScanNode {
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
- TFileType locationType =
getLocationType(fileSplit.getPath().toString());
+ TFileType locationType;
+ if (fileSplit instanceof IcebergSplit
+ && ((IcebergSplit)
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+ locationType = TFileType.FILE_BROKER;
+ } else {
+ locationType = getLocationType(fileSplit.getPath().toString());
+ }
TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected
from hive partitions.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
index 5f79623ff4a..91a208202d0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
@@ -201,15 +201,18 @@ public class IcebergMetadataCache {
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.setConf(conf);
- Map<String, String> catalogProperties = new HashMap<>();
- catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, uri);
- catalogProperties.put("uri", uri);
- hiveCatalog.initialize("hive", catalogProperties);
-
+ if (props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+ props.put(HMSProperties.HIVE_METASTORE_URIS, uri);
+ props.put("uri", uri);
+ hiveCatalog.initialize("hive", props);
+ } else {
+ Map<String, String> catalogProperties = new HashMap<>();
+ catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, uri);
+ catalogProperties.put("uri", uri);
+ hiveCatalog.initialize("hive", catalogProperties);
+ }
Table table = HiveMetaStoreClientHelper.ugiDoAs(conf, () ->
hiveCatalog.loadTable(TableIdentifier.of(db, tbl)));
-
initIcebergTableFileIO(table, props);
-
return table;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]