pandaapo commented on code in PR #4650:
URL: https://github.com/apache/eventmesh/pull/4650#discussion_r1440237503
##########
eventmesh-connectors/eventmesh-connector-file/src/main/resources/source-config.yml:
##########
@@ -15,20 +15,13 @@
# limitations under the License.
#
-pubSubConfig:
- meshAddress: 127.0.0.1:10000
- subject: TopicTest
- idc: FT
- env: PRD
- group: fileSource
- appId: 5032
- userName: fileSourceUser
- passWord: filePassWord
connectorConfig:
connectorName: fileSource
nameserver: 127.0.0.1:9877
topic: TopicTest
commitOffsetIntervalMs: 5000
+ fileName: TopicTest-%s
+ filePath: TopicTest/%s
Review Comment:
When users use this File Source Connector, they are required to place local
files in directories structured like XXX/2024/01/03. I think it is not
user-friendly. Why not directly use the filePath and fileName configured by the
user here to create input stream?
##########
eventmesh-connectors/eventmesh-connector-file/src/main/resources/source-config.yml:
##########
@@ -15,20 +15,13 @@
# limitations under the License.
#
-pubSubConfig:
- meshAddress: 127.0.0.1:10000
- subject: TopicTest
- idc: FT
- env: PRD
- group: fileSource
- appId: 5032
- userName: fileSourceUser
- passWord: filePassWord
connectorConfig:
connectorName: fileSource
nameserver: 127.0.0.1:9877
topic: TopicTest
commitOffsetIntervalMs: 5000
+ fileName: TopicTest-%s
+ filePath: TopicTest/%s
offsetStorageConfig:
offsetStorageType: nacos
offsetStorageAddr: 127.0.0.1:8848
Review Comment:
Regarding these offset configurations, as I mentioned before, the File
Source Connector you implemented reads all the content of the file at once, and
offsets seem to have no effect. Could `offsetStorageConfig` be removed?
##########
eventmesh-connectors/eventmesh-connector-file/src/main/resources/source-config.yml:
##########
@@ -15,20 +15,13 @@
# limitations under the License.
#
-pubSubConfig:
Review Comment:
Why was `pubSubConfig` configuration removed? With this configuration
removed, this Source Connector will no longer work properly.
##########
eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java:
##########
@@ -73,12 +93,48 @@ public String name() {
@Override
public void stop() {
-
+ try {
+ if (bufferedReader != null) {
+ bufferedReader.close();
+ }
+ } catch (Exception e) {
+ log.error("Error closing resources: {}", e.getMessage());
+ }
}
@Override
public List<ConnectRecord> poll() {
- return null;
+ List<ConnectRecord> connectRecords = new ArrayList<>();
+ try {
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ ConnectRecord connectRecord = new ConnectRecord(new
RecordPartition(), new RecordOffset(), System.currentTimeMillis(), line);
Review Comment:
The offset recording seems redundant because the Source Connector you've
implemented reads the entire contents of a file in a single pass and does not
perform incremental reads.
##########
eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/config/FileSourceConfig.java:
##########
@@ -21,8 +21,10 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
@Data
+@Slf4j
@EqualsAndHashCode(callSuper = true)
Review Comment:
Where do these two annotations work?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]