This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ea4f7eba335 [fix][io] Fix the Alluxio sink to write messages 
successfully after the first file rotation (#19247)
ea4f7eba335 is described below

commit ea4f7eba335e72e1d298ec127bc0d140be2b7be2
Author: Kengo Seki <[email protected]>
AuthorDate: Wed Jan 18 20:06:43 2023 +0900

    [fix][io] Fix the Alluxio sink to write messages successfully after the 
first file rotation (#19247)
---
 .../apache/pulsar/io/alluxio/sink/AlluxioSink.java |  6 ++--
 .../pulsar/io/alluxio/sink/AlluxioSinkTest.java    | 39 +++++++++++++++++++---
 2 files changed, 38 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
 
b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
index e457d43d2b3..413f05e0e17 100644
--- 
a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
+++ 
b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
@@ -67,7 +67,7 @@ public class AlluxioSink implements Sink<GenericObject> {
 
     private FileSystem fileSystem;
     private FileOutStream fileOutStream;
-    private CreateFilePOptions.Builder optionsBuilder;
+
     private long recordsNum;
     private String tmpFilePath;
     private String fileDirPath;
@@ -114,8 +114,6 @@ public class AlluxioSink implements Sink<GenericObject> {
             fileSystem.createDirectory(tmpAlluxioDirPath);
         }
 
-        optionsBuilder = 
FileSystemOptions.createFileDefaults(configuration).toBuilder();
-
         recordsNum = 0;
         recordsToAck = Lists.newArrayList();
         tmpFilePath = "";
@@ -206,6 +204,8 @@ public class AlluxioSink implements Sink<GenericObject> {
     }
 
     private void createTmpFile() throws AlluxioException, IOException {
+        CreateFilePOptions.Builder optionsBuilder =
+                
FileSystemOptions.createFileDefaults(configuration).toBuilder();
         UUID id = UUID.randomUUID();
         String fileExtension = alluxioSinkConfig.getFileExtension();
         tmpFilePath = tmpFileDirPath + "/" + id.toString() + "_tmp" + 
fileExtension;
diff --git 
a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
 
b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
index 5d464182616..9325a2255ab 100644
--- 
a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
+++ 
b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
@@ -40,6 +40,7 @@ import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -125,6 +126,13 @@ public class AlluxioSinkTest {
         
when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String, 
Foobar>>>) invocation -> kvSchema);
     }
 
+    @AfterMethod
+    public void tearDown() throws Exception {
+        if (cluster != null) {
+            cluster.stop();
+        }
+    }
+
     @Test
     public void openTest() throws Exception {
         map.put("filePrefix", "TopicA");
@@ -147,7 +155,6 @@ public class AlluxioSinkTest {
         Assert.assertTrue(client.exists(alluxioTmpURI));
 
         sink.close();
-        cluster.stop();
     }
 
     @Test
@@ -186,11 +193,36 @@ public class AlluxioSinkTest {
         Assert.assertTrue(client.exists(alluxioTmpURI));
 
         List<URIStatus> listAlluxioDirStatus = client.listStatus(alluxioURI);
-
         List<String> pathList = 
listAlluxioDirStatus.stream().map(URIStatus::getPath).collect(Collectors.toList());
-
         Assert.assertEquals(pathList.size(), 2);
 
+        for (String path : pathList) {
+            if (path.contains("tmp")) {
+                // Ensure that the temporary file is rotated and the directory 
is empty
+                Assert.assertEquals(path, "/pulsar/tmp");
+            } else {
+                // Ensure that all rotated files conform the naming convention
+                Assert.assertTrue(path.startsWith("/pulsar/TopicA-"));
+            }
+        }
+
+        // Ensure the subsequent writes are also successful
+        sink.write(() -> new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return SchemaType.KEY_VALUE;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return new KeyValue<>((String) fooBar.getField("address"), 
fooBar);
+            }
+        });
+
+        listAlluxioDirStatus = client.listStatus(alluxioURI);
+        pathList = 
listAlluxioDirStatus.stream().map(URIStatus::getPath).collect(Collectors.toList());
+        Assert.assertEquals(pathList.size(), 3);
+
         for (String path : pathList) {
             if (path.contains("tmp")) {
                 Assert.assertEquals(path, "/pulsar/tmp");
@@ -200,7 +232,6 @@ public class AlluxioSinkTest {
         }
 
         sink.close();
-        cluster.stop();
     }
 
     private LocalAlluxioCluster setupSingleMasterCluster() throws Exception {

Reply via email to