This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 187461e2fd7 [Fix](Export) Export delete multiple times when specify
the `delete_existing_files` property () (#39304)
187461e2fd7 is described below
commit 187461e2fd74db2ba61fe417e195489f9ca4a6ef
Author: Tiewei Fang <[email protected]>
AuthorDate: Tue Aug 13 22:26:02 2024 +0800
[Fix](Export) Export delete multiple times when specify the
`delete_existing_files` property () (#39304)
bp: #38400
When the `Export` statement specifies the `delete_existing_files`
property, each `Outfile` statement generated by the `Export` will carry
this property. This causes each `Outfile` statement to delete existing
files, so only the result of the last Outfile statement will be
retained.
So, we add a rpc method which can delete existing files for `Export`
statement and the `Outfile` statements generated by the `Export` will
not carry `delete_existing_files` property any more.
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/service/internal_service.cpp | 6 +++---
.../org/apache/doris/common/util/BrokerUtil.java | 12 ++++++++++-
.../main/java/org/apache/doris/load/ExportJob.java | 4 +---
.../main/java/org/apache/doris/load/ExportMgr.java | 10 ++++++++++
.../doris/load/loadv2/SparkEtlJobHandler.java | 2 +-
.../apache/doris/load/loadv2/SparkRepository.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 23 +++++-----------------
.../apache/doris/common/util/BrokerUtilTest.java | 2 +-
.../doris/load/loadv2/SparkEtlJobHandlerTest.java | 2 +-
gensrc/proto/internal_service.proto | 3 ---
10 files changed, 34 insertions(+), 32 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 3be1d2eefdf..29d9e9ad363 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -678,7 +678,7 @@ void
PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController
uint32_t len = request->result_file_sink().size();
st = deserialize_thrift_msg(buf, &len, false, &result_file_sink);
if (!st.ok()) {
- LOG(WARNING) << "outfile write success filefailed, errmsg=" <<
st;
+ LOG(WARNING) << "outfile write success file failed, errmsg = "
<< st;
st.to_protobuf(result->mutable_status());
return;
}
@@ -697,7 +697,7 @@ void
PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController
bool exists = true;
st = io::global_local_filesystem()->exists(file_name, &exists);
if (!st.ok()) {
- LOG(WARNING) << "outfile write success filefailed, errmsg=" <<
st;
+ LOG(WARNING) << "outfile write success filefailed, errmsg = "
<< st;
st.to_protobuf(result->mutable_status());
return;
}
@@ -705,7 +705,7 @@ void
PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController
st = Status::InternalError("File already exists: {}",
file_name);
}
if (!st.ok()) {
- LOG(WARNING) << "outfile write success filefailed, errmsg=" <<
st;
+ LOG(WARNING) << "outfile write success file failed, errmsg = "
<< st;
st.to_protobuf(result->mutable_status());
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index d32a04331b5..c5a2803b848 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -107,6 +107,16 @@ public class BrokerUtil {
}
}
+ public static void deleteDirectoryWithFileSystem(String path, BrokerDesc
brokerDesc) throws UserException {
+ RemoteFileSystem fileSystem = FileSystemFactory.get(
+ brokerDesc.getName(), brokerDesc.getStorageType(),
brokerDesc.getProperties());
+ Status st = fileSystem.deleteDirectory(path);
+ if (!st.ok()) {
+ throw new UserException(brokerDesc.getName() + " delete directory
exception. path="
+ + path + ", err: " + st.getErrMsg());
+ }
+ }
+
public static String printBroker(String brokerName, TNetworkAddress
address) {
return brokerName + "[" + address.toString() + "]";
}
@@ -358,7 +368,7 @@ public class BrokerUtil {
* @param brokerDesc
* @throws UserException if broker op failed
*/
- public static void deletePath(String path, BrokerDesc brokerDesc) throws
UserException {
+ public static void deletePathWithBroker(String path, BrokerDesc
brokerDesc) throws UserException {
TNetworkAddress address = getAddress(brokerDesc);
TPaloBrokerService.Client client = borrowClient(address);
boolean failed = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 4b5f1087b04..303887875eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -634,9 +634,7 @@ public class ExportJob implements Writable {
if (!maxFileSize.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE,
maxFileSize);
}
- if (!deleteExistingFiles.isEmpty()) {
- outfileProperties.put(OutFileClause.PROP_DELETE_EXISTING_FILES,
deleteExistingFiles);
- }
+
outfileProperties.put(OutFileClause.PROP_WITH_BOM, withBom);
// broker properties
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 7c2351fba5a..7439fd89aa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
@@ -103,6 +104,15 @@ public class ExportMgr {
throw new LabelAlreadyUsedException(job.getLabel());
}
unprotectAddJob(job);
+ // delete existing files
+ if (Config.enable_delete_existing_files &&
Boolean.parseBoolean(job.getDeleteExistingFiles())) {
+ if (job.getBrokerDesc() == null) {
+ throw new AnalysisException("Local file system does not
support delete existing files");
+ }
+ String fullPath = job.getExportPath();
+ BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0,
fullPath.lastIndexOf('/') + 1),
+ job.getBrokerDesc());
+ }
job.getTaskExecutors().forEach(executor -> {
Long taskId =
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
job.getTaskIdToExecutor().put(taskId, executor);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 75e3a6e1718..69a41bd1283 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -365,7 +365,7 @@ public class SparkEtlJobHandler {
public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
try {
- BrokerUtil.deletePath(outputPath, brokerDesc);
+ BrokerUtil.deletePathWithBroker(outputPath, brokerDesc);
LOG.info("delete path success. path: {}", outputPath);
} catch (UserException e) {
LOG.warn("delete path failed. path: {}", outputPath, e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
index 4efd2d17279..19b21ff11fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java
@@ -166,7 +166,7 @@ public class SparkRepository {
try {
String remoteArchivePath = getRemoteArchivePath(currentDppVersion);
if (isReplace) {
- BrokerUtil.deletePath(remoteArchivePath, brokerDesc);
+ BrokerUtil.deletePathWithBroker(remoteArchivePath, brokerDesc);
currentArchive.libraries.clear();
}
String srcFilePath = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f1ccfb0563e..2cdb3313294 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -153,16 +153,12 @@ import
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTable
import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
-import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.Planner;
-import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
@@ -172,7 +168,6 @@ import
org.apache.doris.proto.InternalService.POutfileWriteSuccessResult;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
-import org.apache.doris.qe.Coordinator.FragmentExecParams;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
@@ -1892,26 +1887,18 @@ public class StmtExecutor {
TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions();
// 2. set brokerNetAddress
- List<PlanFragment> fragments = coord.getFragments();
- Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap =
coord.getFragmentExecParamsMap();
- PlanFragmentId topId = fragments.get(0).getFragmentId();
- FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
- DataSink topDataSink = topParams.fragment.getSink();
- TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
- if (topDataSink instanceof ResultFileSink
- && ((ResultFileSink) topDataSink).getStorageType() ==
StorageBackend.StorageType.BROKER) {
+ StorageType storageType = outFileClause.getBrokerDesc() == null
+ ? StorageBackend.StorageType.LOCAL :
outFileClause.getBrokerDesc().getStorageType();
+ if (storageType == StorageType.BROKER) {
// set the broker address for OUTFILE sink
- ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
- FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
- .getBroker(topResultFileSink.getBrokerName(),
execBeAddr.getHostname());
+ String brokerName = outFileClause.getBrokerDesc().getName();
+ FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName);
sinkOptions.setBrokerAddresses(Lists.newArrayList(new
TNetworkAddress(broker.host, broker.port)));
}
// 3. set TResultFileSink properties
TResultFileSink sink = new TResultFileSink();
sink.setFileOptions(sinkOptions);
- StorageType storageType = outFileClause.getBrokerDesc() == null
- ? StorageBackend.StorageType.LOCAL :
outFileClause.getBrokerDesc().getStorageType();
sink.setStorageBackendType(storageType.toThrift());
// 4. get BE
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
index ceae283a2fd..13bbd66fab1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
@@ -318,7 +318,7 @@ public class BrokerUtilTest {
try {
BrokerDesc brokerDesc = new BrokerDesc("broker0",
Maps.newHashMap());
-
BrokerUtil.deletePath("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9",
brokerDesc);
+
BrokerUtil.deletePathWithBroker("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9",
brokerDesc);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
index 5ecfa2e2d64..d1b6c786441 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
@@ -418,7 +418,7 @@ public class SparkEtlJobHandlerTest {
public void testDeleteEtlOutputPath(@Mocked BrokerUtil brokerUtil) throws
UserException {
new Expectations() {
{
- BrokerUtil.deletePath(etlOutputPath, (BrokerDesc) any);
+ BrokerUtil.deletePathWithBroker(etlOutputPath, (BrokerDesc)
any);
times = 1;
}
};
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 9b2f106a1f4..1d68d60aa61 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -688,9 +688,6 @@ message PFetchTableSchemaResult {
}
message POutfileWriteSuccessRequest {
- // optional string file_path = 1;
- // optional string success_file_name = 2;
- // map<string, string> broker_properties = 4; // only for remote file
optional bytes result_file_sink = 1;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]