This is an automated email from the ASF dual-hosted git repository. dweeks pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 5439cbdb27 Kafka Connect: Docs on configuring the sink (#10746) 5439cbdb27 is described below commit 5439cbdb278232779fdd9a392bbf57f007f9bda0 Author: Bryan Keller <brya...@gmail.com> AuthorDate: Tue Sep 10 09:42:31 2024 -0700 Kafka Connect: Docs on configuring the sink (#10746) * Kafka Connect: Docs on configuring the sink * Update docs/docs/kafka-connect.md Co-authored-by: Daniel Weeks <daniel.we...@databricks.com> * correct docs * added blurb about GCS ADC * add note about Kafka 2.5 requirement * Update docs/docs/kafka-connect.md Co-authored-by: Ajantha Bhat <ajanthab...@gmail.com> * Update docs/docs/kafka-connect.md Co-authored-by: Ajantha Bhat <ajanthab...@gmail.com> * Update docs/docs/kafka-connect.md Co-authored-by: Ajantha Bhat <ajanthab...@gmail.com> * document force lowercase config * Revert "document force lowercase config" This reverts commit 4213a0dcf20eeca9bb609a762c5536df19e30400. --------- Co-authored-by: Daniel Weeks <daniel.we...@databricks.com> Co-authored-by: Ajantha Bhat <ajanthab...@gmail.com> --- docs/docs/kafka-connect.md | 352 +++++++++++++++++++++++++++++++++++++++++++++ docs/mkdocs.yml | 1 + 2 files changed, 353 insertions(+) diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md new file mode 100644 index 0000000000..a904a17a99 --- /dev/null +++ b/docs/docs/kafka-connect.md @@ -0,0 +1,352 @@ +--- +title: "Kafka Connect" +--- +<!-- + - Licensed to the Apache Software Foundation (ASF) under one or more + - contributor license agreements. See the NOTICE file distributed with + - this work for additional information regarding copyright ownership. + - The ASF licenses this file to You under the Apache License, Version 2.0 + - (the "License"); you may not use this file except in compliance with + - the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, software + - distributed under the License is distributed on an "AS IS" BASIS, + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + - See the License for the specific language governing permissions and + - limitations under the License. + --> + +# Kafka Connect + +[Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html) is a popular framework for moving data +in and out of Kafka via connectors. There are many different connectors available, such as the S3 sink +for writing data from Kafka to S3 and Debezium source connectors for writing change data capture records from relational +databases to Kafka. + +It has a straightforward, decentralized, distributed architecture. A cluster consists of a number of worker processes, +and a connector runs tasks on these processes to perform the work. Connector deployment is configuration driven, so +generally no code needs to be written to run a connector. + +## Apache Iceberg Sink Connector + +The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writing data from Kafka into Iceberg tables. + +## Features + +* Commit coordination for centralized Iceberg commits +* Exactly-once delivery semantics +* Multi-table fan-out +* Automatic table creation and schema evolution +* Field name mapping via Iceberg’s column mapping functionality + +## Installation + +The connector zip archive is created as part of the Iceberg build. You can run the build via: +```bash +./gradlew -x test -x integrationTest clean build +``` +The zip archive will be found under `./kafka-connect/kafka-connect-runtime/build/distributions`. There is +one distribution that bundles the Hive Metastore client and related dependencies, and one that does not. +Copy the distribution archive into the Kafka Connect plugins directory on all nodes. + +## Requirements + +The sink relies on [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics) +for exactly-once semantics. This requires Kafka 2.5 or later. + +## Configuration + +| Property | Description | +|--------------------------------------------|------------------------------------------------------------------------------------------------------------------| +| iceberg.tables | Comma-separated list of destination tables | +| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` | +| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables | +| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified | +| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) | +| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables | +| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` | +| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` | +| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema | +| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive | +| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create | +| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence | +| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified | +| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) | +| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table | +| iceberg.table.\<table name\>.route-regex | The regex used to match a record's `routeField` to a table | +| iceberg.control.topic | Name of the control topic, default is `control-iceberg` | +| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) | +| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) | +| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) | +| iceberg.catalog | Name of the catalog, default is `iceberg` | +| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization | +| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded | +| iceberg.hadoop.* | Properties passed through to the Hadoop configuration | +| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization | + +If `iceberg.tables.dynamic-enabled` is `false` (the default) then you must specify `iceberg.tables`. If +`iceberg.tables.dynamic-enabled` is `true` then you must specify `iceberg.tables.route-field` which will +contain the name of the table. + +### Kafka configuration + +By default the connector will attempt to use Kafka client config from the worker properties for connecting to +the control topic. If that config cannot be read for some reason, Kafka client settings +can be set explicitly using `iceberg.kafka.*` properties. + +#### Message format + +Messages should be converted to a struct or map using the appropriate Kafka Connect converter. + +### Catalog configuration + +The `iceberg.catalog.*` properties are required for connecting to the Iceberg catalog. The core catalog +types are included in the default distribution, including REST, Glue, DynamoDB, Hadoop, Nessie, +JDBC, and Hive. JDBC drivers are not included in the default distribution, so you will need to include +those if needed. When using a Hive catalog, you can use the distribution that includes the Hive metastore client, +otherwise you will need to include that yourself. + +To set the catalog type, you can set `iceberg.catalog.type` to `rest`, `hive`, or `hadoop`. For other +catalog types, you need to instead set `iceberg.catalog.catalog-impl` to the name of the catalog class. + +#### REST example + +``` +"iceberg.catalog.type": "rest", +"iceberg.catalog.uri": "https://catalog-service", +"iceberg.catalog.credential": "<credential>", +"iceberg.catalog.warehouse": "<warehouse>", +``` + +#### Hive example + +NOTE: Use the distribution that includes the HMS client (or include the HMS client yourself). Use `S3FileIO` when +using S3 for storage (the default is `HadoopFileIO` with `HiveCatalog`). +``` +"iceberg.catalog.type": "hive", +"iceberg.catalog.uri": "thrift://hive:9083", +"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", +"iceberg.catalog.warehouse": "s3a://bucket/warehouse", +"iceberg.catalog.client.region": "us-east-1", +"iceberg.catalog.s3.access-key-id": "<AWS access>", +"iceberg.catalog.s3.secret-access-key": "<AWS secret>", +``` + +#### Glue example + +``` +"iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", +"iceberg.catalog.warehouse": "s3a://bucket/warehouse", +"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", +``` + +#### Nessie example + +``` +"iceberg.catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog", +"iceberg.catalog.uri": "http://localhost:19120/api/v2", +"iceberg.catalog.ref": "main", +"iceberg.catalog.warehouse": "s3a://bucket/warehouse", +"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", +``` + +#### Notes + +Depending on your setup, you may need to also set `iceberg.catalog.s3.endpoint`, `iceberg.catalog.s3.staging-dir`, +or `iceberg.catalog.s3.path-style-access`. See the [Iceberg docs](https://iceberg.apache.org/docs/latest/) for +full details on configuring catalogs. + +### Azure ADLS configuration example + +When using ADLS, Azure requires the passing of AZURE_CLIENT_ID, AZURE_TENANT_ID, and AZURE_CLIENT_SECRET for its Java SDK. +If you're running Kafka Connect in a container, be sure to inject those values as environment variables. See the +[Azure Identity Client library for Java](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme?view=azure-java-stable) for more information. + +An example of these would be: +``` +AZURE_CLIENT_ID=e564f687-7b89-4b48-80b8-111111111111 +AZURE_TENANT_ID=95f2f365-f5b7-44b1-88a1-111111111111 +AZURE_CLIENT_SECRET="XXX" +``` +Where the CLIENT_ID is the Application ID of a registered application under +[App Registrations](https://portal.azure.com/#view/Microsoft_AAD_RegisteredApps/ApplicationsListBlade), the TENANT_ID is +from your [Azure Tenant Properties](https://portal.azure.com/#view/Microsoft_AAD_IAM/TenantProperties.ReactView), and +the CLIENT_SECRET is created within the "Certificates & Secrets" section, under "Manage" after choosing your specific +App Registration. You might have to choose "Client secrets" in the middle panel and the "+" in front of "New client secret" +to generate one. Be sure to set this variable to the Value and not the Id. + +It's also important that the App Registration is granted the Role Assignment "Storage Blob Data Contributor" in your +Storage Account's Access Control (IAM), or it won't be able to write new files there. + +Then, within the Connector's configuration, you'll want to include the following: + +``` +"iceberg.catalog.type": "rest", +"iceberg.catalog.uri": "https://catalog:8181", +"iceberg.catalog.warehouse": "abfss://storage-container-n...@storageaccount.dfs.core.windows.net/warehouse", +"iceberg.catalog.io-impl": "org.apache.iceberg.azure.adlsv2.ADLSFileIO", +"iceberg.catalog.include-credentials": "true" +``` + +Where `storage-container-name` is the container name within your Azure Storage Account, `/warehouse` is the location +within that container where your Apache Iceberg files will be written by default (or if iceberg.tables.auto-create-enabled=true), +and the `include-credentials` parameter passes along the Azure Java client credentials along. This will configure the +Iceberg Sink connector to connect to the REST catalog implementation at `iceberg.catalog.uri` to obtain the required +Connection String for the ADLSv2 client + +### Google GCS configuration example + +By default, Application Default Credentials (ADC) will be used to connect to GCS. Details on how ADC works can +be found in the [Google Cloud documentation](https://cloud.google.com/docs/authentication/application-default-credentials). + +``` +"iceberg.catalog.type": "rest", +"iceberg.catalog.uri": "https://catalog:8181", +"iceberg.catalog.warehouse": "gs://bucket-name/warehouse", +"iceberg.catalog.io-impl": "org.apache.iceberg.google.gcs.GCSFileIO" +``` + +### Hadoop configuration + +When using HDFS or Hive, the sink will initialize the Hadoop configuration. First, config files +from the classpath are loaded. Next, if `iceberg.hadoop-conf-dir` is specified, config files +are loaded from that location. Finally, any `iceberg.hadoop.*` properties from the sink config are +applied. When merging these, the order of precedence is sink config > config dir > classpath. + +## Examples + +### Initial setup + +#### Source topic + +This assumes the source topic already exists and is named `events`. + +#### Control topic + +If your Kafka cluster has `auto.create.topics.enable` set to `true` (the default), then the control topic will be +automatically created. If not, then you will need to create the topic first. The default topic name is `control-iceberg`: +```bash +bin/kafka-topics \ + --command-config command-config.props \ + --bootstrap-server ${CONNECT_BOOTSTRAP_SERVERS} \ + --create \ + --topic control-iceberg \ + --partitions 1 +``` +*NOTE: Clusters running on Confluent Cloud have `auto.create.topics.enable` set to `false` by default.* + +#### Iceberg catalog configuration + +Configuration properties with the prefix `iceberg.catalog.` will be passed to Iceberg catalog initialization. +See the [Iceberg docs](https://iceberg.apache.org/docs/latest/) for details on how to configure +a particular catalog. + +### Single destination table + +This example writes all incoming records to a single table. + +#### Create the destination table + +```sql +CREATE TABLE default.events ( + id STRING, + type STRING, + ts TIMESTAMP, + payload STRING) +PARTITIONED BY (hours(ts)) +``` + +#### Connector config + +This example config connects to a Iceberg REST catalog. +```json +{ +"name": "events-sink", +"config": { + "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", + "tasks.max": "2", + "topics": "events", + "iceberg.tables": "default.events", + "iceberg.catalog.type": "rest", + "iceberg.catalog.uri": "https://localhost", + "iceberg.catalog.credential": "<credential>", + "iceberg.catalog.warehouse": "<warehouse name>" + } +} +``` + +### Multi-table fan-out, static routing + +This example writes records with `type` set to `list` to the table `default.events_list`, and +writes records with `type` set to `create` to the table `default.events_create`. Other records +will be skipped. + +#### Create two destination tables + +```sql +CREATE TABLE default.events_list ( + id STRING, + type STRING, + ts TIMESTAMP, + payload STRING) +PARTITIONED BY (hours(ts)); + +CREATE TABLE default.events_create ( + id STRING, + type STRING, + ts TIMESTAMP, + payload STRING) +PARTITIONED BY (hours(ts)); +``` + +#### Connector config + +```json +{ +"name": "events-sink", +"config": { + "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", + "tasks.max": "2", + "topics": "events", + "iceberg.tables": "default.events_list,default.events_create", + "iceberg.tables.route-field": "type", + "iceberg.table.default.events_list.route-regex": "list", + "iceberg.table.default.events_create.route-regex": "create", + "iceberg.catalog.type": "rest", + "iceberg.catalog.uri": "https://localhost", + "iceberg.catalog.credential": "<credential>", + "iceberg.catalog.warehouse": "<warehouse name>" + } +} +``` + +### Multi-table fan-out, dynamic routing + +This example writes to tables with names from the value in the `db_table` field. If a table with +the name does not exist, then the record will be skipped. For example, if the record's `db_table` +field is set to `default.events_list`, then the record is written to the `default.events_list` table. + +#### Create two destination tables + +See above for creating two tables. + +#### Connector config + +```json +{ +"name": "events-sink", +"config": { + "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", + "tasks.max": "2", + "topics": "events", + "iceberg.tables.dynamic-enabled": "true", + "iceberg.tables.route-field": "db_table", + "iceberg.catalog.type": "rest", + "iceberg.catalog.uri": "https://localhost", + "iceberg.catalog.credential": "<credential>", + "iceberg.catalog.warehouse": "<warehouse name>" + } +} +``` diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 604fede583..edafb727d3 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -64,6 +64,7 @@ nav: - Impala: https://impala.apache.org/docs/build/html/topics/impala_iceberg.html - Doris: https://doris.apache.org/docs/dev/lakehouse/datalake-analytics/iceberg - Druid: https://druid.apache.org/docs/latest/development/extensions-contrib/iceberg/ + - Kafka Connect: kafka-connect.md - Integrations: - aws.md - dell.md