This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 08353f60278 [Enhance](fe) Iceberg table in HMS catalog supports broker 
scan (#28107)
08353f60278 is described below

commit 08353f6027890bf95338afbff95156c3f270023a
Author: DuRipeng <[email protected]>
AuthorDate: Wed Jan 3 11:29:12 2024 +0800

    [Enhance](fe) Iceberg table in HMS catalog supports broker scan (#28107)
    
    My organization uses HMS catalog to accelerate Lake query. Sine we have 
custom distributed file system and hard to integrate to FE / BE, we introduce 
HMS Catalog broker scan support (#24830) and implement custom distributed file 
system adaption in broker.
    
    We want to expand the scope of use to Iceberg table scan in HMS Catalog. 
This PR introduces broker-scan-related `IcebergBrokerIO`, `BrokerInputFile`, 
`BrokerInputStream` for Iceberg table scan
---
 docs/en/docs/lakehouse/multi-catalog/hive.md       |   7 +
 docs/zh-CN/docs/lakehouse/multi-catalog/hive.md    |   6 +
 .../datasource/iceberg/broker/BrokerInputFile.java |  74 +++++++++
 .../iceberg/broker/BrokerInputStream.java          | 169 +++++++++++++++++++++
 .../datasource/iceberg/broker/IcebergBrokerIO.java |  80 ++++++++++
 .../doris/planner/external/FileQueryScanNode.java  |  10 +-
 .../external/iceberg/IcebergMetadataCache.java     |  17 ++-
 7 files changed, 355 insertions(+), 8 deletions(-)

diff --git a/docs/en/docs/lakehouse/multi-catalog/hive.md 
b/docs/en/docs/lakehouse/multi-catalog/hive.md
index 25fddea1250..01c78d3599f 100644
--- a/docs/en/docs/lakehouse/multi-catalog/hive.md
+++ b/docs/en/docs/lakehouse/multi-catalog/hive.md
@@ -391,6 +391,13 @@ Add following setting when creating an HMS catalog, file 
splitting and scanning
 "broker.name" = "test_broker"
 ```
 
+
+Doris has implemented Broker query support for HMS Catalog Iceberg based on 
the Iceberg `FileIO` interface. If needed, the following configuration can be 
added when creating the HMS Catalog.
+
+```sql
+"io-impl" = "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO"
+```
+
 ## Integrate with Apache Ranger
 
 Apache Ranger is a security framework for monitoring, enabling services, and 
comprehensive data security access management on the Hadoop platform.
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md 
b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md
index e75977c25f4..b4efb291f89 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md
@@ -373,6 +373,12 @@ CREATE CATALOG hive PROPERTIES (
 "broker.name" = "test_broker"
 ```
 
+Doris 基于 Iceberg `FileIO` 接口实现了 Broker 查询 HMS Catalog Iceberg 的支持。如有需求,可以在创建 
HMS Catalog 时增加如下配置。
+
+```sql
+"io-impl" = "org.apache.doris.datasource.iceberg.broker.IcebergBrokerIO"
+```
+
 ## 使用 Ranger 进行权限校验
 
 Apache Ranger是一个用来在Hadoop平台上进行监控,启用服务,以及全方位数据安全访问管理的安全框架。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java
new file mode 100644
index 00000000000..529df6c0af1
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputFile.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.broker;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.common.util.BrokerReader;
+import org.apache.doris.thrift.TBrokerFD;
+
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+
+import java.io.IOException;
+
+public class BrokerInputFile implements InputFile {
+
+    private Long fileLength = null;
+
+    private final String filePath;
+    private final BrokerDesc brokerDesc;
+    private BrokerReader reader;
+    private TBrokerFD fd;
+
+    private BrokerInputFile(String filePath, BrokerDesc brokerDesc) {
+        this.filePath = filePath;
+        this.brokerDesc = brokerDesc;
+    }
+
+    private void init() throws IOException {
+        this.reader = BrokerReader.create(this.brokerDesc);
+        this.fileLength = this.reader.getFileLength(filePath);
+        this.fd = this.reader.open(filePath);
+    }
+
+    public static BrokerInputFile create(String filePath, BrokerDesc 
brokerDesc) throws IOException {
+        BrokerInputFile inputFile = new BrokerInputFile(filePath, brokerDesc);
+        inputFile.init();
+        return inputFile;
+    }
+
+    @Override
+    public long getLength() {
+        return fileLength;
+    }
+
+    @Override
+    public SeekableInputStream newStream() {
+        return new BrokerInputStream(this.reader, this.fd, this.fileLength);
+    }
+
+    @Override
+    public String location() {
+        return filePath;
+    }
+
+    @Override
+    public boolean exists() {
+        return fileLength != null;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java
new file mode 100644
index 00000000000..b9f6d30aed4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/BrokerInputStream.java
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.broker;
+
+import org.apache.doris.common.util.BrokerReader;
+import org.apache.doris.thrift.TBrokerFD;
+
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+
+public class BrokerInputStream extends SeekableInputStream {
+    private static final Logger LOG = 
LogManager.getLogger(BrokerInputStream.class);
+    private static final int COPY_BUFFER_SIZE = 1024 * 1024; // 1MB
+
+    private final byte[] tmpBuf = new byte[COPY_BUFFER_SIZE];
+    private long currentPos = 0;
+    private long markPos = 0;
+
+    private long bufferOffset = 0;
+    private long bufferLimit = 0;
+    private final BrokerReader reader;
+    private final TBrokerFD fd;
+    private final long fileLength;
+
+    public BrokerInputStream(BrokerReader reader, TBrokerFD fd, long 
fileLength) {
+        this.fd = fd;
+        this.reader = reader;
+        this.fileLength = fileLength;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return currentPos;
+    }
+
+    @Override
+    public void seek(long newPos) throws IOException {
+        currentPos = newPos;
+    }
+
+    @Override
+    public int read() throws IOException {
+        try {
+            if (currentPos < bufferOffset || currentPos > bufferLimit || 
bufferOffset >= bufferLimit) {
+                bufferOffset = currentPos;
+                fill();
+            }
+            if (currentPos > bufferLimit) {
+                LOG.warn("current pos {} is larger than buffer limit {}."
+                        + " should not happen.", currentPos, bufferLimit);
+                return -1;
+            }
+
+            int pos = (int) (currentPos - bufferOffset);
+            int res = Byte.toUnsignedInt(tmpBuf[pos]);
+            ++currentPos;
+            return res;
+        } catch (BrokerReader.EOFException e) {
+            return -1;
+        }
+    }
+
+    @SuppressWarnings("NullableProblems")
+    @Override
+    public int read(byte[] b) throws IOException {
+        try {
+            byte[] data = reader.pread(fd, currentPos, b.length);
+            System.arraycopy(data, 0, b, 0, data.length);
+            currentPos += data.length;
+            return data.length;
+        } catch (BrokerReader.EOFException e) {
+            return -1;
+        }
+    }
+
+    @SuppressWarnings("NullableProblems")
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        try {
+            if (currentPos < bufferOffset || currentPos > bufferLimit || 
currentPos + len > bufferLimit) {
+                if (len > COPY_BUFFER_SIZE) {
+                    // the data to be read is larger then max size of buffer.
+                    // read it directly.
+                    byte[] data = reader.pread(fd, currentPos, len);
+                    System.arraycopy(data, 0, b, off, data.length);
+                    currentPos += data.length;
+                    return data.length;
+                }
+                // fill the buffer first
+                bufferOffset = currentPos;
+                fill();
+            }
+
+            if (currentPos > bufferLimit) {
+                LOG.warn("current pos {} is larger than buffer limit {}."
+                        + " should not happen.", currentPos, bufferLimit);
+                return -1;
+            }
+
+            int start = (int) (currentPos - bufferOffset);
+            int readLen = Math.min(len, (int) (bufferLimit - bufferOffset));
+            System.arraycopy(tmpBuf, start, b, off, readLen);
+            currentPos += readLen;
+            return readLen;
+        } catch (BrokerReader.EOFException e) {
+            return -1;
+        }
+    }
+
+    private void fill() throws IOException, BrokerReader.EOFException {
+        if (bufferOffset == this.fileLength) {
+            throw new BrokerReader.EOFException();
+        }
+        byte[] data = reader.pread(fd, bufferOffset, COPY_BUFFER_SIZE);
+        System.arraycopy(data, 0, tmpBuf, 0, data.length);
+        bufferLimit = bufferOffset + data.length;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        final long left = fileLength - currentPos;
+        long min = Math.min(n, left);
+        currentPos += min;
+        return min;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close(fd);
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        markPos = currentPos;
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        currentPos = markPos;
+    }
+
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java
new file mode 100644
index 00000000000..ec3b4f17030
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/broker/IcebergBrokerIO.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.broker;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.datasource.HMSExternalCatalog;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableMap;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * FileIO implementation that uses broker to execute Iceberg files IO 
operation.
+ */
+public class IcebergBrokerIO implements FileIO {
+
+    private SerializableMap<String, String> properties = 
SerializableMap.copyOf(ImmutableMap.of());
+    private BrokerDesc brokerDesc = null;
+
+    @Override
+    public void initialize(Map<String, String> props) {
+        this.properties = SerializableMap.copyOf(props);
+        if (!properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+            throw new UnsupportedOperationException(String.format("No broker 
is specified, "
+                    + "try to set '%s' in HMS Catalog", 
HMSExternalCatalog.BIND_BROKER_NAME));
+        }
+        String brokerName = 
properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
+        this.brokerDesc = new BrokerDesc(brokerName, 
properties.immutableMap());
+    }
+
+    @Override
+    public Map<String, String> properties() {
+        return properties.immutableMap();
+    }
+
+    @Override
+    public InputFile newInputFile(String path) {
+        if (brokerDesc == null) {
+            throw new UnsupportedOperationException("IcebergBrokerIO should be 
initialized first");
+        }
+        try {
+            return BrokerInputFile.create(path, brokerDesc);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public OutputFile newOutputFile(String path) {
+        throw new UnsupportedOperationException("IcebergBrokerIO does not 
support writing files");
+    }
+
+    @Override
+    public void deleteFile(String path) {
+        throw new UnsupportedOperationException("IcebergBrokerIO does not 
support deleting files");
+    }
+
+    @Override
+    public void close() { }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 9bae707c2f3..b54b05f47e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -34,10 +34,12 @@ import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.AcidInfo;
 import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.iceberg.IcebergSplit;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -313,7 +315,13 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         List<String> pathPartitionKeys = getPathPartitionKeys();
         for (Split split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
-            TFileType locationType = 
getLocationType(fileSplit.getPath().toString());
+            TFileType locationType;
+            if (fileSplit instanceof IcebergSplit
+                    && ((IcebergSplit) 
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+                locationType = TFileType.FILE_BROKER;
+            } else {
+                locationType = getLocationType(fileSplit.getPath().toString());
+            }
 
             TScanRangeLocations curLocations = newLocations();
             // If fileSplit has partition values, use the values collected 
from hive partitions.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
index 5f79623ff4a..91a208202d0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
@@ -201,15 +201,18 @@ public class IcebergMetadataCache {
         HiveCatalog hiveCatalog = new HiveCatalog();
         hiveCatalog.setConf(conf);
 
-        Map<String, String> catalogProperties = new HashMap<>();
-        catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, uri);
-        catalogProperties.put("uri", uri);
-        hiveCatalog.initialize("hive", catalogProperties);
-
+        if (props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+            props.put(HMSProperties.HIVE_METASTORE_URIS, uri);
+            props.put("uri", uri);
+            hiveCatalog.initialize("hive", props);
+        } else {
+            Map<String, String> catalogProperties = new HashMap<>();
+            catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, uri);
+            catalogProperties.put("uri", uri);
+            hiveCatalog.initialize("hive", catalogProperties);
+        }
         Table table = HiveMetaStoreClientHelper.ugiDoAs(conf, () -> 
hiveCatalog.loadTable(TableIdentifier.of(db, tbl)));
-
         initIcebergTableFileIO(table, props);
-
         return table;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to