fuzing commented on issue #12895:
URL: https://github.com/apache/iceberg/issues/12895#issuecomment-2839697843
@jesumyip - sorry, a couple of other things, namely:
- You're resource capping the connect container at 2GB. Kafka-connect is
notoriously memory hungry. Depending upon your use-case, you may run into java
VM memory exhaustion issues out of the gate. Make sure that you tune
KAFKA_HEAP_OPTS such that it's set a couple/few GB below whatever has been
allocated (we use a 32GB limited container with 28GB allocated to the to the
JVM heap like so: "-Xmx28G"). For laptop/toy/test configuration I'd probably
start with 6GB total, with 4GB allocated to JVM heap (although this is quite
lean for K/C).
- On startup, in distributed mode, the connector will sit there idle until
you prime the pump by injecting your config on the connector REST port (see
example below, with <sections-between-angle-brackets> standing in for variables
we inject). This will kick off the process. Conversely, in standalone mode
you can do all your config via the environment (but will need to make the
appropriate changes to the container to get this working - personally I
wouldn't bother with standalone mode). Adjust commit threads based on #cores
allocated (2x #cores is usually the go to, but often you can scale up from 2x
to keep your cores maxed out for high throughput. Monitor via docker stats or
whatever you use). Configure transforms etc. per your use-case.
- The example below uses 4 tasks, and in our test setup each task ingests
from a pair of kafka partitions (i.e. 8 total partitions)
```
{
"tasks.max": "4",
"connector.class":
"org.apache.iceberg.connect.IcebergSinkConnector",
"topics": "<some-topic>",
"key.converter":
"org.apache.kafka.connect.storage.StringConverter",
"value.converter":
"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",
"iceberg.catalog": "iceberg",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "<catalog-uri>",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.tables.evolve-schema-enabled": "true",
"iceberg.control.commit.interval-ms": "300000",
"iceberg.control.commit.timeout-ms": "30000",
"iceberg.control.commit.threads": "8",
"iceberg.catalog.warehouse": "<warehouse>",
"iceberg.catalog.io-impl": "<implementation>",
"iceberg.catalog.s3.endpoint": "<endpoint>",
"iceberg.catalog.s3.access-key-id": "<key_id>",
"iceberg.catalog.s3.secret-access-key": "<secret>",
"iceberg.catalog.client.region": "<region>",
"transforms": "jsontomap,tsconverter",
"transforms.jsontomap.type":
"org.apache.iceberg.connect.transforms.JsonToMapTransform",
"transforms.jsontomap.json.root": "false",
"transforms.tsconverter.type":
"org.apache.kafka.connect.transforms.TimestampConverter$$Value",
"transforms.tsconverter.field": "timestamp",
"transforms.tsconverter.unix.precision": "milliseconds",
"transforms.tsconverter.target.type": "Timestamp",
"sink.properties.strip_outer_array": "true"
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]