[
https://issues.apache.org/jira/browse/BEAM-5964?focusedWorklogId=170481&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170481
]
ASF GitHub Bot logged work on BEAM-5964:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Nov/18 23:19
Start Date: 28/Nov/18 23:19
Worklog Time Spent: 10m
Work Description: chamikaramj commented on a change in pull request
#7006: [BEAM-5964] Add ClickHouseIO.Write
URL: https://github.com/apache/beam/pull/7006#discussion_r237239967
##########
File path:
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
+import org.apache.beam.sdk.io.clickhouse.TableSchema.DefaultType;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.ClickHouseDataSource;
+import ru.yandex.clickhouse.ClickHouseStatement;
+import ru.yandex.clickhouse.settings.ClickHouseQueryParam;
+
+/** An IO to write to ClickHouse. */
+@Experimental(Experimental.Kind.SCHEMAS)
+public class ClickHouseIO {
+
+ /** A {@link PTransform} to write to ClickHouse. */
+ @AutoValue
+ public abstract static class Write<T> extends PTransform<PCollection<T>,
PDone> {
+ public abstract String jdbcUrl();
+
+ public abstract String table();
+
+ @Nullable
+ public abstract Properties properties();
+
+ public static TableSchema getTableSchema(String jdbcUrl, String table) {
+ ResultSet rs;
+ try (ClickHouseConnection connection = new
ClickHouseDataSource(jdbcUrl).getConnection();
+ Statement statement = connection.createStatement()) {
+ rs = statement.executeQuery("DESCRIBE TABLE " + table);
+ List<TableSchema.Column> columns = new ArrayList<>();
+
+ while (rs.next()) {
+ String name = rs.getString("name");
+ String type = rs.getString("type");
+ String defaultTypeStr = rs.getString("default_type");
+ String defaultExpression = rs.getString("default_expression");
+
+ ColumnType columnType = ColumnType.parse(type);
+ DefaultType defaultType =
DefaultType.parse(defaultTypeStr).orElse(null);
+
+ Object defaultValue;
+ if (DefaultType.DEFAULT.equals(defaultType)
+ && !Strings.isNullOrEmpty(defaultExpression)) {
+ defaultValue = ColumnType.parseDefaultExpression(columnType,
defaultExpression);
+ } else {
+ defaultValue = null;
+ }
+
+ columns.add(TableSchema.Column.of(name, columnType, defaultType,
defaultValue));
+ }
+
+ // findbugs doesn't like it in try block
+ rs.close();
+
+ return TableSchema.of(columns.toArray(new TableSchema.Column[0]));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ TableSchema tableSchema = getTableSchema(jdbcUrl(), table());
+ Properties properties = properties() == null ? new Properties() :
properties();
+
+ input.apply(ParDo.of(WriteFn.create(jdbcUrl(), table(), tableSchema,
properties)));
+
+ return PDone.in(input.getPipeline());
+ }
+
+ public static <T> Builder<T> builder() {
+ return new AutoValue_ClickHouseIO_Write.Builder<>();
+ }
+
+ /** Builder for {@link Write}. */
+ @AutoValue.Builder
+ public abstract static class Builder<T> {
+
+ public abstract Builder<T> jdbcUrl(String jdbcUrl);
+
+ public abstract Builder<T> table(String table);
+
+ public abstract Builder<T> properties(Properties properties);
+
+ public abstract Write<T> build();
+ }
+ }
+
+ public static ClickHouseProperties properties() {
+ return new ClickHouseProperties();
+ }
+
+ /**
+ * Builder for {@link Properties} for JDBC connection.
+ *
+ * @see <a
href="https://clickhouse.yandex/docs/en/single/#settings_1">ClickHouse
+ * documentation</a>
+ */
+ public static class ClickHouseProperties implements Serializable {
+ private final Properties properties = new Properties();
+
+ /**
+ * Recommendation for what size of block (in number of rows) to load from
tables.
+ *
+ * @see <a
href="https://clickhouse.yandex/docs/en/single/#max_block_size">ClickHouse
+ * documentation</a>
+ * @param value number of rows
+ * @return builder
+ */
+ public ClickHouseProperties maxBlockSize(int value) {
+ return set(ClickHouseQueryParam.MAX_BLOCK_SIZE, value);
+ }
+
+ /**
+ * The maximum block size for insertion, if we control the creation of
blocks for insertion.
+ *
+ * @see <a
href="https://clickhouse.yandex/docs/en/single/#max_insert_block_size">ClickHouse
+ * documentation</a>
+ * @param value number of rows
+ * @return builder
+ */
+ public ClickHouseProperties maxInsertBlockSize(long value) {
+ return set(ClickHouseQueryParam.MAX_INSERT_BLOCK_SIZE, value);
+ }
+
+ /**
+ * If setting is enabled, insert query into distributed waits until data
will be sent to all
+ * nodes in cluster.
+ *
+ * @param value true to enable
+ * @return builder
+ */
+ public ClickHouseProperties insertDistributedSync(boolean value) {
+ return set("insert_distributed_sync", value ? 1 : 0);
+ }
+
+ /**
+ * For INSERT queries in the replicated table, wait writing for the
specified number of replicas
+ * and linearize the addition of the data. 0 - disabled.
+ *
+ * @see <a
href="https://clickhouse.yandex/docs/en/single/#insert_quorum">ClickHouse
+ * documentation</a>
+ * @param value number of replicas, 0 for disabling
+ * @return builder
+ */
+ public ClickHouseProperties insertQuorum(long value) {
+ return set(ClickHouseQueryParam.INSERT_QUORUM, value);
+ }
+
+ /**
+ * For INSERT queries in the replicated table, specifies that
deduplication of inserting blocks
+ * should be preformed.
+ *
+ * @param value true to enable
+ * @return builder
+ */
+ public ClickHouseProperties insertDeduplicate(boolean value) {
+ return set("insert_deduplicate", value ? 1L : 0L);
+ }
+
+ public ClickHouseProperties set(ClickHouseQueryParam param, Object value) {
+ Preconditions.checkArgument(param.getClazz().isInstance(value));
+ properties.put(param, value);
+ return this;
+ }
+
+ public ClickHouseProperties set(String param, Object value) {
+ properties.put(param, value);
+ return this;
+ }
+
+ public Properties build() {
+ return properties;
+ }
+ }
+
+ @AutoValue
+ abstract static class WriteFn<T> extends DoFn<T, Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(WriteFn.class);
+ private static final int QUEUE_SIZE = 1024;
+
+ private ClickHouseConnection connection;
+ private ClickHouseStatement statement;
+
+ private ExecutorService executor;
+ private BlockingQueue<Row> queue;
+ private AtomicBoolean bundleFinished;
+ private Future<?> insert;
+
+ // TODO: This should be the same as resolved so that Beam knows which
fields
+ // are being accessed. Currently Beam only supports wildcard descriptors.
+ // Once BEAM-4457 is fixed, fix this.
+ @FieldAccess("filterFields")
+ final FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withAllFields();
+
+ public abstract String jdbcUrl();
+
+ public abstract String table();
+
+ public abstract TableSchema schema();
+
+ public abstract Properties properties();
+
+ public static <T> WriteFn<T> create(
+ String jdbcUrl, String table, TableSchema schema, Properties
properties) {
+ return new AutoValue_ClickHouseIO_WriteFn<>(jdbcUrl, table, schema,
properties);
+ }
+
+ static String quoteIdentifier(String identifier) {
+ String backslash = "\\\\";
+ String quote = "\"";
+
+ return quote + identifier.replaceAll(quote, backslash + quote) + quote;
+ }
+
+ @VisibleForTesting
+ static String insertSql(TableSchema schema, String table) {
+ String columnsStr =
+ schema
+ .columns()
+ .stream()
+ .filter(x -> !x.materializedOrAlias())
+ .map(x -> quoteIdentifier(x.name()))
+ .collect(Collectors.joining(", "));
+ return "INSERT INTO " + quoteIdentifier(table) + " (" + columnsStr + ")";
+ }
+
+ @Setup
+ public void setup() throws SQLException {
+ connection = new ClickHouseDataSource(jdbcUrl(),
properties()).getConnection();
+ executor =
+ Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("clickhouse-jdbc-%d").build());
+ queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
+ bundleFinished = new AtomicBoolean(false);
+ }
+
+ @StartBundle
+ public void startBundle() throws SQLException {
+ statement = connection.createStatement();
+ bundleFinished.set(false);
+
+ // When bundle starts, we open statement and stream data in bundle.
+ // When we finish bundle, we close statement.
+
+ // For streaming, we create a background thread for http client,
+ // and we feed it through BlockingQueue.
+
+ insert =
+ executor.submit(
+ () -> {
+ try {
+ statement.sendRowBinaryStream(
+ insertSql(schema(), table()),
+ stream -> {
+ while (true) {
+ Row row;
+ try {
+ row = queue.poll(1, TimeUnit.SECONDS);
Review comment:
Why not just buffer data at procesElement() and flush as needed in
processElement() and finishBundle(). Is there a need to create this thread and
poll for data ? I think former approach (taken by most IO connectors) will be
simpler and perform just as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 170481)
Time Spent: 2h 20m (was: 2h 10m)
> Add ClickHouseIO.Write
> ----------------------
>
> Key: BEAM-5964
> URL: https://issues.apache.org/jira/browse/BEAM-5964
> Project: Beam
> Issue Type: New Feature
> Components: io-ideas
> Reporter: Gleb Kanterov
> Assignee: Gleb Kanterov
> Priority: Major
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> h3. Motivation
> ClickHouse is open-source columnar DBMS for OLAP. It allows analysis of data
> that is updated in real time. The project was released as open-source
> software under the Apache 2 license in June 2016.
> h3. Design and implementation
> 1. Do only writes, reads aren't useful because ClickHouse is designed for
> OLAP queries
> 2. For writes, do write in batches and rely on idempotent and atomic inserts
> supported by replicated tables in ClickHouse
> 3. Implement ClickHouseIO.Write as PTransform<PCollection<Row>, PDone>
> 4. Rely on having logic for casting rows between schemas in BEAM-5918, and
> don't put it in ClickHouseIO.Write
> h3. References
> [1]
> http://highscalability.com/blog/2017/9/18/evolution-of-data-structures-in-yandexmetrica.html
> [2]
> https://blog.cloudflare.com/how-cloudflare-analyzes-1m-dns-queries-per-second/
> [3]
> https://blog.cloudflare.com/http-analytics-for-6m-requests-per-second-using-clickhouse/
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)