pandaapo commented on code in PR #4650:
URL: https://github.com/apache/eventmesh/pull/4650#discussion_r1440237503


##########
eventmesh-connectors/eventmesh-connector-file/src/main/resources/source-config.yml:
##########
@@ -15,20 +15,13 @@
 # limitations under the License.
 #
 
-pubSubConfig:
-    meshAddress: 127.0.0.1:10000
-    subject: TopicTest
-    idc: FT
-    env: PRD
-    group: fileSource
-    appId: 5032
-    userName: fileSourceUser
-    passWord: filePassWord
 connectorConfig:
     connectorName: fileSource
     nameserver: 127.0.0.1:9877
     topic: TopicTest
     commitOffsetIntervalMs: 5000
+    fileName: TopicTest-%s
+    filePath: TopicTest/%s

Review Comment:
   When users use this File Source Connector, they are required to place local 
files in directories structured like XXX/2024/01/03. I think it is not 
user-friendly. Why not directly use the filePath and fileName configured by the 
user here to create input stream?



##########
eventmesh-connectors/eventmesh-connector-file/src/main/resources/source-config.yml:
##########
@@ -15,20 +15,13 @@
 # limitations under the License.
 #
 
-pubSubConfig:
-    meshAddress: 127.0.0.1:10000
-    subject: TopicTest
-    idc: FT
-    env: PRD
-    group: fileSource
-    appId: 5032
-    userName: fileSourceUser
-    passWord: filePassWord
 connectorConfig:
     connectorName: fileSource
     nameserver: 127.0.0.1:9877
     topic: TopicTest
     commitOffsetIntervalMs: 5000
+    fileName: TopicTest-%s
+    filePath: TopicTest/%s
 offsetStorageConfig:
     offsetStorageType: nacos
     offsetStorageAddr: 127.0.0.1:8848

Review Comment:
   Regarding these offset configurations, as I mentioned before, the File 
Source Connector you implemented reads all the content of the file at once, and 
offsets seem to have no effect. Could `offsetStorageConfig` be removed?



##########
eventmesh-connectors/eventmesh-connector-file/src/main/resources/source-config.yml:
##########
@@ -15,20 +15,13 @@
 # limitations under the License.
 #
 
-pubSubConfig:

Review Comment:
   Why was `pubSubConfig` configuration removed? With this configuration 
removed, this Source Connector will no longer work properly.



##########
eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java:
##########
@@ -73,12 +93,48 @@ public String name() {
 
     @Override
     public void stop() {
-
+        try {
+            if (bufferedReader != null) {
+                bufferedReader.close();
+            }
+        } catch (Exception e) {
+            log.error("Error closing resources: {}", e.getMessage());
+        }
     }
 
     @Override
     public List<ConnectRecord> poll() {
-        return null;
+        List<ConnectRecord> connectRecords = new ArrayList<>();
+        try {
+            String line;
+            while ((line = bufferedReader.readLine()) != null) {
+                ConnectRecord connectRecord = new ConnectRecord(new 
RecordPartition(), new RecordOffset(), System.currentTimeMillis(), line);

Review Comment:
   The offset recording seems redundant because the Source Connector you've 
implemented reads the entire contents of a file in a single pass and does not 
perform incremental reads.



##########
eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/config/FileSourceConfig.java:
##########
@@ -21,8 +21,10 @@
 
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
 
 @Data
+@Slf4j
 @EqualsAndHashCode(callSuper = true)

Review Comment:
   Where do these two annotations work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to