This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ea4f7eba335 [fix][io] Fix the Alluxio sink to write messages
successfully after the first file rotation (#19247)
ea4f7eba335 is described below
commit ea4f7eba335e72e1d298ec127bc0d140be2b7be2
Author: Kengo Seki <[email protected]>
AuthorDate: Wed Jan 18 20:06:43 2023 +0900
[fix][io] Fix the Alluxio sink to write messages successfully after the
first file rotation (#19247)
---
.../apache/pulsar/io/alluxio/sink/AlluxioSink.java | 6 ++--
.../pulsar/io/alluxio/sink/AlluxioSinkTest.java | 39 +++++++++++++++++++---
2 files changed, 38 insertions(+), 7 deletions(-)
diff --git
a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
index e457d43d2b3..413f05e0e17 100644
---
a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
+++
b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
@@ -67,7 +67,7 @@ public class AlluxioSink implements Sink<GenericObject> {
private FileSystem fileSystem;
private FileOutStream fileOutStream;
- private CreateFilePOptions.Builder optionsBuilder;
+
private long recordsNum;
private String tmpFilePath;
private String fileDirPath;
@@ -114,8 +114,6 @@ public class AlluxioSink implements Sink<GenericObject> {
fileSystem.createDirectory(tmpAlluxioDirPath);
}
- optionsBuilder =
FileSystemOptions.createFileDefaults(configuration).toBuilder();
-
recordsNum = 0;
recordsToAck = Lists.newArrayList();
tmpFilePath = "";
@@ -206,6 +204,8 @@ public class AlluxioSink implements Sink<GenericObject> {
}
private void createTmpFile() throws AlluxioException, IOException {
+ CreateFilePOptions.Builder optionsBuilder =
+
FileSystemOptions.createFileDefaults(configuration).toBuilder();
UUID id = UUID.randomUUID();
String fileExtension = alluxioSinkConfig.getFileExtension();
tmpFilePath = tmpFileDirPath + "/" + id.toString() + "_tmp" +
fileExtension;
diff --git
a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
index 5d464182616..9325a2255ab 100644
---
a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
+++
b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
@@ -40,6 +40,7 @@ import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -125,6 +126,13 @@ public class AlluxioSinkTest {
when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String,
Foobar>>>) invocation -> kvSchema);
}
+ @AfterMethod
+ public void tearDown() throws Exception {
+ if (cluster != null) {
+ cluster.stop();
+ }
+ }
+
@Test
public void openTest() throws Exception {
map.put("filePrefix", "TopicA");
@@ -147,7 +155,6 @@ public class AlluxioSinkTest {
Assert.assertTrue(client.exists(alluxioTmpURI));
sink.close();
- cluster.stop();
}
@Test
@@ -186,11 +193,36 @@ public class AlluxioSinkTest {
Assert.assertTrue(client.exists(alluxioTmpURI));
List<URIStatus> listAlluxioDirStatus = client.listStatus(alluxioURI);
-
List<String> pathList =
listAlluxioDirStatus.stream().map(URIStatus::getPath).collect(Collectors.toList());
-
Assert.assertEquals(pathList.size(), 2);
+ for (String path : pathList) {
+ if (path.contains("tmp")) {
+ // Ensure that the temporary file is rotated and the directory
is empty
+ Assert.assertEquals(path, "/pulsar/tmp");
+ } else {
+ // Ensure that all rotated files conform the naming convention
+ Assert.assertTrue(path.startsWith("/pulsar/TopicA-"));
+ }
+ }
+
+ // Ensure the subsequent writes are also successful
+ sink.write(() -> new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return new KeyValue<>((String) fooBar.getField("address"),
fooBar);
+ }
+ });
+
+ listAlluxioDirStatus = client.listStatus(alluxioURI);
+ pathList =
listAlluxioDirStatus.stream().map(URIStatus::getPath).collect(Collectors.toList());
+ Assert.assertEquals(pathList.size(), 3);
+
for (String path : pathList) {
if (path.contains("tmp")) {
Assert.assertEquals(path, "/pulsar/tmp");
@@ -200,7 +232,6 @@ public class AlluxioSinkTest {
}
sink.close();
- cluster.stop();
}
private LocalAlluxioCluster setupSingleMasterCluster() throws Exception {