This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a4db64d7c7 [Improve][E2E] Support windows for the e2e of paimon (#7329)
a4db64d7c7 is described below
commit a4db64d7c76f07daaf8030011ee5e8552396acd2
Author: zhangdonghao <[email protected]>
AuthorDate: Wed Aug 7 11:52:19 2024 +0800
[Improve][E2E] Support windows for the e2e of paimon (#7329)
---
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 53 +++++++++++++++++++---
1 file changed, 46 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index c899dd0e8b..4b1d7dd86c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.e2e.connector.paimon;
import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -25,6 +26,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
@@ -52,6 +54,7 @@ import org.testcontainers.containers.Container;
import lombok.extern.slf4j.Slf4j;
+import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
@@ -68,7 +71,8 @@ import static org.awaitility.Awaitility.given;
"Spark and Flink engine can not auto create paimon table on
worker node in local file(e.g flink tm) by savemode feature which can lead
error")
@Slf4j
public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource {
- private static final String CATALOG_ROOT_DIR = "/tmp/";
+
+ private static String CATALOG_ROOT_DIR = "/tmp/";
private static final String NAMESPACE = "paimon";
private static final String NAMESPACE_TAR = "paimon.tar.gz";
private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE +
"/";
@@ -77,10 +81,18 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
private static final String FAKE_DATABASE1 = "FakeDatabase1";
private static final String FAKE_TABLE2 = "FakeTable1";
private static final String FAKE_DATABASE2 = "FakeDatabase2";
+ private String CATALOG_ROOT_DIR_WIN = "C:/Users/";
+ private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
+ private boolean isWindows;
@BeforeAll
@Override
- public void startUp() throws Exception {}
+ public void startUp() throws Exception {
+ this.isWindows =
+
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+ CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN +
System.getProperty("user.name") + "/tmp/";
+ CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
+ }
@AfterAll
@Override
@@ -498,8 +510,15 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
protected final ContainerExtendedFactory containerExtendedFactory =
container -> {
- FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
- FileUtils.createNewDir(CATALOG_DIR);
+ if (isWindows) {
+ FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR);
+ FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar");
+ FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN);
+ } else {
+ FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
+ FileUtils.createNewDir(CATALOG_DIR);
+ }
+
container.execInContainer(
"sh",
"-c",
@@ -510,8 +529,13 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
+ " "
+ NAMESPACE);
container.copyFileFromContainer(
- CATALOG_ROOT_DIR + NAMESPACE_TAR, CATALOG_ROOT_DIR +
NAMESPACE_TAR);
- extractFiles();
+ CATALOG_ROOT_DIR + NAMESPACE_TAR,
+ (isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR)
+ NAMESPACE_TAR);
+ if (isWindows) {
+ extractFilesWin();
+ } else {
+ extractFiles();
+ }
};
private void extractFiles() {
@@ -532,6 +556,17 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
}
}
+ private void extractFilesWin() {
+ try {
+ CompressionUtils.unGzip(
+ new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new
File(CATALOG_ROOT_DIR_WIN));
+ CompressionUtils.unTar(
+ new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new
File(CATALOG_ROOT_DIR_WIN));
+ } catch (IOException | ArchiveException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private List<PaimonRecord> loadPaimonData(String dbName, String tbName)
throws Exception {
Table table = getTable(dbName, tbName);
ReadBuilder readBuilder = table.newReadBuilder();
@@ -575,7 +610,11 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
private Catalog getCatalog() {
Options options = new Options();
- options.set("warehouse", "file://" + CATALOG_DIR);
+ if (isWindows) {
+ options.set("warehouse", "file://" + CATALOG_DIR_WIN);
+ } else {
+ options.set("warehouse", "file://" + CATALOG_DIR);
+ }
Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(options));
return catalog;
}