This is an automated email from the ASF dual-hosted git repository. baomingyu pushed a commit to branch inlongmsg-tlog-csv in repository https://gitbox.apache.org/repos/asf/inlong.git
commit b69dba03ce2fcae2d60a13bc09d549fd25e692ca Author: nicobao <[email protected]> AuthorDate: Mon Jan 22 16:39:46 2024 +0800 [INLONG-9597][Sort] Support row way of sort InLong message tlog-csv format --- .../format-row/format-inlongmsg-tlogcsv/pom.xml | 130 ++++++++++ .../formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java | 72 ++++++ .../InLongMsgTlogCsvFormatDeserializer.java | 264 +++++++++++++++++++ .../InLongMsgTlogCsvFormatFactory.java | 158 ++++++++++++ .../InLongMsgTlogCsvMixedFormatConverter.java | 163 ++++++++++++ .../InLongMsgTlogCsvMixedFormatDeserializer.java | 174 +++++++++++++ .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java | 170 ++++++++++++ .../InLongMsgTlogCsvValidator.java | 40 +++ .../org.apache.flink.table.factories.TableFactory | 16 ++ .../InLongMsgTlogCsvFormatDeserializerTest.java | 285 +++++++++++++++++++++ .../InLongMsgTlogCsvFormatFactoryTest.java | 121 +++++++++ .../inlongmsgtlogcsv/InLongMsgTlogCsvTest.java | 108 ++++++++ .../src/test/resources/log4j-test.properties | 27 ++ inlong-sort/sort-formats/format-row/pom.xml | 1 + 14 files changed, 1729 insertions(+) diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/pom.xml b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/pom.xml new file mode 100644 index 0000000000..d32673c7a0 --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/pom.xml @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>format-row</artifactId> + <version>1.11.0-SNAPSHOT</version> + </parent> + + <artifactId>sort-format-inlongmsg-tlogcsv</artifactId> + <packaging>jar</packaging> + <name>Apache InLong - Sort Format-InLongMsg-TLogCSV</name> + + <properties> + <inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-format-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-format-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-format-inlongmsg-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + <version>${flink.jackson.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + <profiles> + <!-- Create SQL Client uber jars by default --> + <profile> + <id>sql-jars</id> + <activation> + <property> + <name>!skipSqlJars</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>jar</goal> + </goals> + <phase>package</phase> + <configuration> + <classifier>sql-jar</classifier> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java new file mode 100644 index 0000000000..f5c5fb53fa --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.sort.formats.base.TextFormatDescriptor; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER; + +/** + * Format descriptor for comma-separated values (CSV). + * + * <p>This descriptor aims to comply with RFC-4180 ("Common Format and MIME Type + * for Comma-Separated Values (CSV) Files") proposed by the Internet Engineering + * Task Force (IETF). + */ +public class InLongMsgTlogCsv extends TextFormatDescriptor<InLongMsgTlogCsv> { + + public static final String FORMAT_TYPE_VALUE = "InLongMsg-TLOGCSV"; + + public InLongMsgTlogCsv() { + super(FORMAT_TYPE_VALUE, 1); + } + + /** + * Sets the delimiter character (',' by default). + * + * @param delimiter the field delimiter character + */ + public InLongMsgTlogCsv delimiter(char delimiter) { + internalProperties.putCharacter(FORMAT_DELIMITER, delimiter); + return this; + } + + /** + * Sets the name of the time field. + * + * @param timeFieldName The name of the time field. + */ + public InLongMsgTlogCsv timeFieldName(String timeFieldName) { + checkNotNull(timeFieldName); + internalProperties.putString(InLongMsgUtils.FORMAT_TIME_FIELD_NAME, timeFieldName); + return this; + } + + /** + * Sets the name of the attributes field. + * + * @param attributesFieldName The name of the attributes field. + */ + public InLongMsgTlogCsv attributesFieldName(String attributesFieldName) { + checkNotNull(attributesFieldName); + internalProperties.putString(InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME, attributesFieldName); + return this; + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java new file mode 100644 index 0000000000..191df7fd16 --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.sort.formats.base.TextFormatBuilder; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.types.Row; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME; + +/** + * The deserializer for the records in InLongMsgTlogCsv format. + */ +public final class InLongMsgTlogCsvFormatDeserializer extends AbstractInLongMsgFormatDeserializer { + + private static final long serialVersionUID = 1L; + + /** + * Format information describing the result type. + */ + @Nonnull + private final RowFormatInfo rowFormatInfo; + + /** + * The name of the time field. + */ + @Nullable + private final String timeFieldName; + + /** + * The name of the attributes field. + */ + @Nullable + private final String attributesFieldName; + + /** + * The charset of the text. + */ + private final String charset; + + /** + * The delimiter between fields. + */ + @Nonnull + private final Character delimiter; + + /** + * Escape character. Null if escaping is disabled. + */ + @Nullable + private final Character escapeChar; + + /** + * Quote character. Null if quoting is disabled. + */ + @Nullable + private final Character quoteChar; + + /** + * The literal represented null values, default "". + */ + @Nullable + private final String nullLiteral; + + public InLongMsgTlogCsvFormatDeserializer( + @Nonnull RowFormatInfo rowFormatInfo, + @Nullable String timeFieldName, + @Nullable String attributesFieldName, + @Nonnull String charset, + @Nonnull Character delimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable String nullLiteral, + @Nonnull Boolean ignoreErrors) { + this( + rowFormatInfo, + timeFieldName, + attributesFieldName, + charset, + delimiter, + escapeChar, + quoteChar, + nullLiteral, + InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); + } + + public InLongMsgTlogCsvFormatDeserializer( + @Nonnull RowFormatInfo rowFormatInfo, + @Nullable String timeFieldName, + @Nullable String attributesFieldName, + @Nonnull String charset, + @Nonnull Character delimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable String nullLiteral, + @Nonnull FailureHandler failureHandler) { + super(failureHandler); + + this.rowFormatInfo = rowFormatInfo; + this.timeFieldName = timeFieldName; + this.attributesFieldName = attributesFieldName; + this.charset = charset; + this.delimiter = delimiter; + this.escapeChar = escapeChar; + this.quoteChar = quoteChar; + this.nullLiteral = nullLiteral; + } + + @Override + public TypeInformation<Row> getProducedType() { + return InLongMsgUtils.decorateRowTypeWithNeededHeadFields(timeFieldName, attributesFieldName, rowFormatInfo); + } + + @Override + protected InLongMsgHead parseHead(String attr) throws Exception { + return InLongMsgTlogCsvUtils.parseHead(attr); + } + + @Override + protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception { + return Collections.singletonList( + InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, quoteChar)); + } + + @Override + protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) throws Exception { + Row dataRow = + InLongMsgTlogCsvUtils.deserializeRow( + rowFormatInfo, + nullLiteral, + head.getPredefinedFields(), + body.getFields()); + + Row row = InLongMsgUtils.decorateRowWithNeededHeadFields( + timeFieldName, + attributesFieldName, + head.getTime(), + head.getAttributes(), + dataRow); + return Collections.singletonList(row); + } + + /** + * The builder for {@link InLongMsgTlogCsvFormatDeserializer}. + */ + public static class Builder extends TextFormatBuilder<Builder> { + + private String timeFieldName = DEFAULT_TIME_FIELD_NAME; + private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME; + private Character delimiter = DEFAULT_DELIMITER; + + public Builder(RowFormatInfo rowFormatInfo) { + super(rowFormatInfo); + } + + public Builder setTimeFieldName(String timeFieldName) { + this.timeFieldName = timeFieldName; + return this; + } + + public Builder setAttributesFieldName(String attributesFieldName) { + this.attributesFieldName = attributesFieldName; + return this; + } + + public Builder setDelimiter(Character delimiter) { + this.delimiter = delimiter; + return this; + } + + public Builder configure(DescriptorProperties descriptorProperties) { + super.configure(descriptorProperties); + + descriptorProperties.getOptionalString(FORMAT_TIME_FIELD_NAME) + .ifPresent(this::setTimeFieldName); + descriptorProperties.getOptionalString(FORMAT_ATTRIBUTES_FIELD_NAME) + .ifPresent(this::setAttributesFieldName); + descriptorProperties.getOptionalCharacter(FORMAT_DELIMITER) + .ifPresent(this::setDelimiter); + + return this; + } + + public InLongMsgTlogCsvFormatDeserializer build() { + return new InLongMsgTlogCsvFormatDeserializer( + rowFormatInfo, + timeFieldName, + attributesFieldName, + charset, + delimiter, + escapeChar, + quoteChar, + nullLiteral, + ignoreErrors); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + if (!super.equals(o)) { + return false; + } + + InLongMsgTlogCsvFormatDeserializer that = (InLongMsgTlogCsvFormatDeserializer) o; + return rowFormatInfo.equals(that.rowFormatInfo) && + Objects.equals(timeFieldName, that.timeFieldName) && + Objects.equals(attributesFieldName, that.attributesFieldName) && + Objects.equals(charset, that.charset) && + delimiter.equals(that.delimiter) && + Objects.equals(escapeChar, that.escapeChar) && + Objects.equals(quoteChar, that.quoteChar) && + Objects.equals(nullLiteral, that.nullLiteral); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName, + attributesFieldName, charset, delimiter, escapeChar, quoteChar, + nullLiteral); + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java new file mode 100644 index 0000000000..bb83ad6558 --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.sort.formats.base.TableFormatDeserializer; +import org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext; +import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter; +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory; + +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.TableFormatFactoryBase; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_CHARSET; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ESCAPE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo; + +/** + * Table format factory for providing configured instances of InLongMsgTlogCsv-to-row + * serializer and deserializer. + */ +public final class InLongMsgTlogCsvFormatFactory + extends + TableFormatFactoryBase<Row> + implements + TableFormatDeserializerFactory, + InLongMsgMixedFormatFactory { + + public InLongMsgTlogCsvFormatFactory() { + super(InLongMsgTlogCsv.FORMAT_TYPE_VALUE, 1, false); + } + + @Override + public List<String> supportedFormatProperties() { + final List<String> properties = new ArrayList<>(); + properties.add(FORMAT_DELIMITER); + properties.add(FORMAT_ESCAPE_CHARACTER); + properties.add(FORMAT_QUOTE_CHARACTER); + properties.add(FORMAT_NULL_LITERAL); + properties.add(FORMAT_CHARSET); + properties.add(FORMAT_IGNORE_ERRORS); + properties.add(FORMAT_SCHEMA); + properties.add(FORMAT_TIME_FIELD_NAME); + properties.add(FORMAT_ATTRIBUTES_FIELD_NAME); + + return properties; + } + + @Override + public InLongMsgTlogCsvFormatDeserializer createFormatDeserializer( + Map<String, String> properties) { + final DescriptorProperties descriptorProperties = + new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + InLongMsgTlogCsvValidator validator = new InLongMsgTlogCsvValidator(); + validator.validate(descriptorProperties); + + RowFormatInfo rowFormatInfo = getDataRowFormatInfo(descriptorProperties); + + InLongMsgTlogCsvFormatDeserializer.Builder builder = + new InLongMsgTlogCsvFormatDeserializer.Builder(rowFormatInfo); + builder.configure(descriptorProperties); + + return builder.build(); + } + + @Override + public TableFormatDeserializer createFormatDeserializer(TableFormatContext context) { + TableFormatDeserializer deserializer = + createFormatDeserializer(context.getFormatProperties()); + deserializer.init(context); + return deserializer; + } + + @Override + public InLongMsgTlogCsvMixedFormatDeserializer createMixedFormatDeserializer( + Map<String, String> properties) { + final DescriptorProperties descriptorProperties = + new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + InLongMsgMixedFormatDeserializerValidator validator = + new InLongMsgMixedFormatDeserializerValidator(); + validator.validate(descriptorProperties); + + InLongMsgTlogCsvMixedFormatDeserializer.Builder builder = + new InLongMsgTlogCsvMixedFormatDeserializer.Builder(); + builder.configure(descriptorProperties); + + return builder.build(); + } + + @Override + public AbstractInLongMsgMixedFormatConverter createMixedFormatConverter( + AbstractInLongMsgMixedFormatConverter.TableFormatContext context) { + return createMixedFormatConverter(context.getFormatProperties()); + } + + @Override + public AbstractInLongMsgMixedFormatDeserializer createMixedFormatDeserializer( + TableFormatContext context) { + InLongMsgTlogCsvMixedFormatDeserializer deserializer = + createMixedFormatDeserializer(context.getFormatProperties()); + deserializer.init(context); + return deserializer; + } + + @Override + public InLongMsgTlogCsvMixedFormatConverter createMixedFormatConverter( + Map<String, String> properties) { + final DescriptorProperties descriptorProperties = + new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + InLongMsgMixedFormatConverterValidator validator = + new InLongMsgMixedFormatConverterValidator(); + validator.validate(descriptorProperties); + + RowFormatInfo rowFormatInfo = getDataRowFormatInfo(descriptorProperties); + InLongMsgTlogCsvMixedFormatConverter.Builder builder = + new InLongMsgTlogCsvMixedFormatConverter.Builder(rowFormatInfo); + builder.configure(descriptorProperties); + + return builder.build(); + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java new file mode 100644 index 0000000000..da210ed2ce --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.sql.Timestamp; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Converter used to deserialize a mixed row in InLongMsg-Tlog-CSV format. + */ +public class InLongMsgTlogCsvMixedFormatConverter extends AbstractInLongMsgMixedFormatConverter { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger( + InLongMsgTlogCsvMixedFormatConverter.class); + + /** + * The schema of the rows. + */ + @Nonnull + private final RowFormatInfo rowFormatInfo; + + /** + * The name of the time field. + */ + @Nullable + private final String timeFieldName; + + /** + * The name of the attributes field. + */ + @Nullable + private final String attributesFieldName; + + /** + * The literal representing null values. + */ + @Nullable + private final String nullLiteral; + + public InLongMsgTlogCsvMixedFormatConverter( + @Nonnull RowFormatInfo rowFormatInfo, + @Nullable String timeFieldName, + @Nullable String attributesFieldName, + @Nullable String nullLiteral, + boolean ignoreErrors) { + super(ignoreErrors); + + this.rowFormatInfo = rowFormatInfo; + this.timeFieldName = timeFieldName; + this.attributesFieldName = attributesFieldName; + this.nullLiteral = nullLiteral; + } + + @Override + public TypeInformation<Row> getProducedType() { + return InLongMsgUtils.decorateRowTypeWithNeededHeadFields(timeFieldName, attributesFieldName, rowFormatInfo); + } + + @Override + public List<Row> convertRows( + Map<String, String> attributes, + byte[] data, + String tid, + Timestamp time, + List<String> predefinedFields, + List<String> fields, + Map<String, String> entries) throws Exception { + Row dataRow = + InLongMsgTlogCsvUtils.deserializeRow( + rowFormatInfo, + nullLiteral, + predefinedFields, + fields); + + Row row = InLongMsgUtils.decorateRowWithNeededHeadFields( + timeFieldName, + attributesFieldName, + time, + attributes, + dataRow); + + return Collections.singletonList(row); + } + + /** + * The builder for {@link InLongMsgTlogCsvMixedFormatConverter}. + */ + public static class Builder extends InLongMsgMixedFormatConverterBuilder { + + public Builder(RowFormatInfo rowFormatInfo) { + super(rowFormatInfo); + } + + public InLongMsgTlogCsvMixedFormatConverter build() { + return new InLongMsgTlogCsvMixedFormatConverter( + rowFormatInfo, + timeFieldName, + attributesFieldName, + nullLiteral, + ignoreErrors); + } + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + + if (object == null || getClass() != object.getClass()) { + return false; + } + + if (!super.equals(object)) { + return false; + } + + InLongMsgTlogCsvMixedFormatConverter that = (InLongMsgTlogCsvMixedFormatConverter) object; + return rowFormatInfo.equals(that.rowFormatInfo) && + Objects.equals(timeFieldName, that.timeFieldName) && + Objects.equals(attributesFieldName, that.attributesFieldName) && + Objects.equals(nullLiteral, that.nullLiteral); + } + + @Override + public int hashCode() { + return Objects.hash(rowFormatInfo, timeFieldName, attributesFieldName, + nullLiteral); + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java new file mode 100644 index 0000000000..ca0f0b9f86 --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.types.Row; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER; + +/** + * The deserializer for the records in InLongMsgTlogCsv format. + */ +public final class InLongMsgTlogCsvMixedFormatDeserializer + extends + AbstractInLongMsgMixedFormatDeserializer { + + private static final long serialVersionUID = 1L; + + /** + * The charset of the text. + */ + private final String charset; + + /** + * The delimiter between fields. + */ + @Nonnull + private final Character delimiter; + + /** + * Escape character. Null if escaping is disabled. + */ + @Nullable + private final Character escapeChar; + + /** + * Quote character. Null if quoting is disabled. + */ + @Nullable + private final Character quoteChar; + + public InLongMsgTlogCsvMixedFormatDeserializer( + @Nonnull String charset, + @Nonnull Character delimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nonnull Boolean ignoreErrors) { + this(charset, delimiter, escapeChar, quoteChar, InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); + } + + public InLongMsgTlogCsvMixedFormatDeserializer( + @Nonnull String charset, + @Nonnull Character delimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nonnull FailureHandler failureHandler) { + super(failureHandler); + + this.delimiter = delimiter; + this.charset = charset; + this.escapeChar = escapeChar; + this.quoteChar = quoteChar; + } + + @Override + public TypeInformation<Row> getProducedType() { + return InLongMsgUtils.MIXED_ROW_TYPE; + } + + @Override + protected InLongMsgHead parseHead(String attr) throws Exception { + return InLongMsgTlogCsvUtils.parseHead(attr); + } + + @Override + protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception { + return Collections.singletonList( + InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, quoteChar)); + } + + @Override + protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) throws Exception { + Row row = InLongMsgUtils.buildMixedRow(head, body, body.getTid()); + return Collections.singletonList(row); + } + + /** + * The builder for {@link InLongMsgTlogCsvMixedFormatDeserializer}. + */ + public static class Builder extends InLongMsgTextMixedFormatDeserializerBuilder<Builder> { + + private Character delimiter; + + public Builder setDelimiter(char delimiter) { + this.delimiter = delimiter; + return this; + } + + @Override + public Builder configure(DescriptorProperties descriptorProperties) { + super.configure(descriptorProperties); + + descriptorProperties.getOptionalCharacter(FORMAT_DELIMITER) + .ifPresent(this::setDelimiter); + + return this; + } + + public InLongMsgTlogCsvMixedFormatDeserializer build() { + return new InLongMsgTlogCsvMixedFormatDeserializer( + charset, + delimiter, + escapeChar, + quoteChar, + ignoreErrors); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + if (!super.equals(o)) { + return false; + } + InLongMsgTlogCsvMixedFormatDeserializer that = (InLongMsgTlogCsvMixedFormatDeserializer) o; + return charset.equals(that.charset) && + delimiter.equals(that.delimiter) && + Objects.equals(escapeChar, that.escapeChar) && + Objects.equals(quoteChar, that.quoteChar); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), charset, delimiter, escapeChar, + quoteChar); + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java new file mode 100644 index 0000000000..8a5ef167a2 --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; + +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime; +import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv; + +/** + * Utilities for {@link InLongMsgTlogCsv}. + */ +public class InLongMsgTlogCsvUtils { + + private static final Logger LOG = LoggerFactory.getLogger(InLongMsgTlogCsvUtils.class); + + public static InLongMsgHead parseHead(String attr) { + Map<String, String> attributes = parseAttr(attr); + + // Extracts time from the attributes + Timestamp time; + + if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) { + String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim(); + time = parseDateTime(date); + } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) { + String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim(); + time = parseEpochTime(epoch); + } else { + throw new IllegalArgumentException( + "Could not find " + INLONGMSG_ATTR_TIME_T + + " or " + INLONGMSG_ATTR_TIME_DT + " in attributes!"); + } + + // Extracts predefined fields from the attributes + List<String> predefinedFields = getPredefinedFields(attributes); + + return new InLongMsgHead(attributes, null, time, predefinedFields); + } + + public static InLongMsgBody parseBody( + byte[] bytes, + String charset, + char delimiter, + Character escapeChar, + Character quoteChar) { + String text; + if (bytes[0] == delimiter) { + text = new String(bytes, 1, bytes.length - 1, Charset.forName(charset)); + } else { + text = new String(bytes, Charset.forName(charset)); + } + + String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar); + + String tid = segments[0]; + List<String> fields = + Arrays.stream(segments, 1, segments.length).collect(Collectors.toList()); + + return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap()); + } + + /** + * Deserializes the given fields into the row. + * + * @param rowFormatInfo The format information of the row. + * @param nullLiteral The literal for null values. + * @param predefinedFields The predefined fields. + * @param fields The fields. + * @return The row deserialized from the row. + */ + public static Row deserializeRow( + RowFormatInfo rowFormatInfo, + String nullLiteral, + List<String> predefinedFields, + List<String> fields) { + String[] fieldNames = rowFormatInfo.getFieldNames(); + FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos(); + + int actualNumFields = predefinedFields.size() + fields.size(); + if (actualNumFields != fieldNames.length) { + LOG.warn("The number of fields mismatches: " + fieldNames.length + + " expected, but was " + actualNumFields + "."); + } + + Row row = new Row(fieldNames.length); + + for (int i = 0; i < predefinedFields.size(); ++i) { + + if (i >= fieldNames.length) { + break; + } + + String fieldName = fieldNames[i]; + FormatInfo fieldFormatInfo = fieldFormatInfos[i]; + + String fieldText = predefinedFields.get(i); + + Object field = + deserializeBasicField( + fieldName, + fieldFormatInfo, + fieldText, + nullLiteral); + row.setField(i, field); + } + + for (int i = 0; i < fields.size(); ++i) { + + if (i + predefinedFields.size() >= fieldNames.length) { + break; + } + + String fieldName = fieldNames[i + predefinedFields.size()]; + FormatInfo fieldFormatInfo = fieldFormatInfos[i + predefinedFields.size()]; + + String fieldText = fields.get(i); + + Object field = + deserializeBasicField( + fieldName, + fieldFormatInfo, + fieldText, + nullLiteral); + row.setField(i + predefinedFields.size(), field); + } + + for (int i = predefinedFields.size() + fields.size(); i < fieldNames.length; ++i) { + row.setField(i, null); + } + + return row; + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java new file mode 100644 index 0000000000..1e9f297004 --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.sort.formats.base.TextFormatDescriptorValidator; + +import org.apache.flink.table.descriptors.DescriptorProperties; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema; + +/** + * The validator for {@link InLongMsgTlogCsv}. + */ +public class InLongMsgTlogCsvValidator extends TextFormatDescriptorValidator { + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + + properties.validateString(FORMAT_DELIMITER, true, 1, 1); + + validateInLongMsgSchema(properties); + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory new file mode 100644 index 0000000000..a5531edfdb --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.inlong.sort.formats.inlongmsgtlogcsv.InLongMsgTlogCsvFormatFactory diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java new file mode 100644 index 0000000000..4060fda847 --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.common.msg.InLongMsg; +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; + +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link InLongMsgTlogCsvFormatDeserializer}. + */ +public class InLongMsgTlogCsvFormatDeserializerTest { + + private static final RowFormatInfo TEST_ROW_INFO = + new RowFormatInfo( + new String[]{"f1", "f2", "f3", "f4", "f5"}, + new FormatInfo[]{ + IntFormatInfo.INSTANCE, + IntFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE + }); + + @Test + public void testExceptionHandler() throws Exception { + TestFailureHandler errorHandler = new TestFailureHandler(); + InLongMsgTlogCsvFormatDeserializer deserializer = + new InLongMsgTlogCsvFormatDeserializer( + TEST_ROW_INFO, + DEFAULT_TIME_FIELD_NAME, + DEFAULT_ATTRIBUTES_FIELD_NAME, + DEFAULT_CHARSET, + DEFAULT_DELIMITER, + null, + null, + null, + errorHandler); + + InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true); + String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=test"; + String body1 = "interfaceId1,field1,field2,field3"; + String body2 = "interfaceId2,field1,field2,field3"; + inLongMsg1.addMsg(attrs, body1.getBytes()); + inLongMsg1.addMsg(attrs, body2.getBytes()); + + List<Row> actualRows = new ArrayList<>(); + Collector<Row> collector = new ListCollector<>(actualRows); + deserializer.flatMap(inLongMsg1.buildArray(), collector); + assertEquals(2, errorHandler.getRowCount()); + + InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg(); + String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test"; + inLongMsg1Head.addMsg(abNormalAttrs, body1.getBytes()); + inLongMsg1Head.addMsg(abNormalAttrs, body2.getBytes()); + deserializer.flatMap(inLongMsg1Head.buildArray(), collector); + assertEquals(1, errorHandler.getHeadCount()); + } + + @Test + public void testNormal() throws Exception { + InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true); + String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=2"; + String body1 = "interfaceId1,field1,field2,field3"; + String body2 = "interfaceId2,field1,field2,field3"; + inLongMsg1.addMsg(attrs, body1.getBytes()); + inLongMsg1.addMsg(attrs, body2.getBytes()); + + Map<String, String> expectedAttributes = new HashMap<>(); + expectedAttributes.put("m", "0"); + expectedAttributes.put("dt", "1584806400000"); + expectedAttributes.put("__addcol1_", "1"); + expectedAttributes.put("__addcol2_", "2"); + + Row expectedRow1 = Row.of( + new Timestamp(1584806400000L), + expectedAttributes, + 1, + 2, + "field1", + "field2", + "field3"); + + Row expectedRow2 = Row.of( + new Timestamp(1584806400000L), + expectedAttributes, + 1, + 2, + "field1", + "field2", + "field3"); + + testRowDeserialization( + inLongMsg1.buildArray(), + Arrays.asList(expectedRow1, expectedRow2)); + } + + @Test + public void testDeleteHeadDelimiter() throws Exception { + InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true); + String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=2"; + String body = ",interfaceId,field1,field2,field3"; + inLongMsg1.addMsg(attrs, body.getBytes()); + + Map<String, String> expectedAttributes = new HashMap<>(); + expectedAttributes.put("m", "0"); + expectedAttributes.put("dt", "1584806400000"); + expectedAttributes.put("__addcol1_", "1"); + expectedAttributes.put("__addcol2_", "2"); + + Row expectedRow = Row.of( + new Timestamp(1584806400000L), + expectedAttributes, + 1, + 2, + "field1", + "field2", + "field3"); + + testRowDeserialization( + inLongMsg1.buildArray(), + Collections.singletonList(expectedRow)); + } + + @Test + public void testUnmatchedFields1() throws Exception { + InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true); + String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=2"; + String body1 = "interfaceId1,field1,field2"; + String body2 = "interfaceId2,field1,field2,field3,field4"; + inLongMsg1.addMsg(attrs, body1.getBytes()); + inLongMsg1.addMsg(attrs, body2.getBytes()); + + Map<String, String> expectedAttributes = new HashMap<>(); + expectedAttributes.put("m", "0"); + expectedAttributes.put("dt", "1584806400000"); + expectedAttributes.put("__addcol1_", "1"); + expectedAttributes.put("__addcol2_", "2"); + + Row expectedRow1 = Row.of( + new Timestamp(1584806400000L), + expectedAttributes, + 1, + 2, + "field1", + "field2", + null); + + Row expectedRow2 = Row.of( + new Timestamp(1584806400000L), + expectedAttributes, + 1, + 2, + "field1", + "field2", + "field3"); + + testRowDeserialization( + inLongMsg1.buildArray(), + Arrays.asList(expectedRow1, expectedRow2)); + } + + @Test + public void testUnmatchedFields2() throws Exception { + InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true); + String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=2&" + + "__addcol3_=3&__addcol4_=4&__addcol5_=5&__addcol6_=6"; + String body = "interfaceId1,field1,field2,field3"; + inLongMsg1.addMsg(attrs, body.getBytes()); + + Map<String, String> expectedAttributes = new HashMap<>(); + expectedAttributes.put("m", "0"); + expectedAttributes.put("dt", "1584806400000"); + expectedAttributes.put("__addcol1_", "1"); + expectedAttributes.put("__addcol2_", "2"); + expectedAttributes.put("__addcol3_", "3"); + expectedAttributes.put("__addcol4_", "4"); + expectedAttributes.put("__addcol5_", "5"); + expectedAttributes.put("__addcol6_", "6"); + + Row expectedRow = Row.of( + new Timestamp(1584806400000L), + expectedAttributes, + 1, + 2, + "3", + "4", + "5"); + + testRowDeserialization( + inLongMsg1.buildArray(), + Collections.singletonList(expectedRow)); + } + + private void testRowDeserialization( + byte[] bytes, + List<Row> expectedRows) throws Exception { + InLongMsgTlogCsvFormatDeserializer deserializer = + new InLongMsgTlogCsvFormatDeserializer.Builder(TEST_ROW_INFO) + .setTimeFieldName("inlongmsg_time") + .setAttributesFieldName("inlongmsg_attributes") + .build(); + + List<Row> actualRows = new ArrayList<>(); + Collector<Row> collector = new ListCollector<>(actualRows); + + deserializer.flatMap(bytes, collector); + + assertEquals(expectedRows, actualRows); + } + + private static class TestFailureHandler implements FailureHandler { + + private int headCount = 0; + private int bodyCount = 0; + private int rowCount = 0; + + public int getHeadCount() { + return headCount; + } + + public int getBodyCount() { + return bodyCount; + } + + public int getRowCount() { + return rowCount; + } + + @Override + public void onParsingHeadFailure(String attribute, Exception exception) throws Exception { + headCount++; + } + + @Override + public void onParsingBodyFailure(byte[] body, Exception exception) throws Exception { + bodyCount++; + } + + @Override + public void onConvertingRowFailure(InLongMsgHead head, + InLongMsgBody body, Exception exception) throws Exception { + rowCount++; + } + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java new file mode 100644 index 0000000000..10e2fbcbcd --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.sort.formats.base.TableFormatDeserializer; +import org.apache.inlong.sort.formats.base.TableFormatForRowUtils; +import org.apache.inlong.sort.formats.common.DateFormatInfo; +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.FormatUtils; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.types.Row; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Tests for {@link InLongMsgTlogCsvFormatFactory}. + */ +public class InLongMsgTlogCsvFormatFactoryTest { + + private static final TypeInformation<Row> SCHEMA = + Types.ROW( + new String[]{"time", "attributes", "student_name", "score", "date"}, + new TypeInformation[]{ + Types.SQL_TIMESTAMP(), + Types.MAP(Types.STRING(), Types.STRING()), + Types.STRING(), + Types.INT(), + Types.SQL_DATE() + }); + + private static final RowFormatInfo TEST_FORMAT_SCHEMA = + new RowFormatInfo( + new String[]{"student_name", "score", "date"}, + new FormatInfo[]{ + StringFormatInfo.INSTANCE, + IntFormatInfo.INSTANCE, + new DateFormatInfo("yyyy-MM-dd") + }); + + @Test + public void testCreateTableFormatDeserializer() throws Exception { + final Map<String, String> properties = + new InLongMsgTlogCsv() + .schema(FormatUtils.marshall(TEST_FORMAT_SCHEMA)) + .delimiter(';') + .charset(StandardCharsets.ISO_8859_1) + .escapeCharacter('\\') + .quoteCharacter('\"') + .nullLiteral("null") + .toProperties(); + assertNotNull(properties); + + final InLongMsgTlogCsvFormatDeserializer expectedDeser = + new InLongMsgTlogCsvFormatDeserializer( + TEST_FORMAT_SCHEMA, + InLongMsgUtils.DEFAULT_TIME_FIELD_NAME, + InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME, + StandardCharsets.ISO_8859_1.name(), + ';', + '\\', + '\"', + "null", + false); + + final TableFormatDeserializer actualDeser = + TableFormatForRowUtils.getTableFormatDeserializer( + properties, + getClass().getClassLoader()); + + assertEquals(expectedDeser, actualDeser); + } + + @Test(expected = Exception.class) + public void testCreateTableFormatDeserializerWithDerivation() { + final Map<String, String> properties = new HashMap<>(); + properties.putAll( + new Schema() + .schema(TableSchema.fromTypeInfo(SCHEMA)) + .toProperties()); + properties.putAll(new InLongMsgTlogCsv().deriveSchema().toProperties()); + + final InLongMsgTlogCsvFormatDeserializer expectedDeser = + new InLongMsgTlogCsvFormatDeserializer.Builder(TEST_FORMAT_SCHEMA).build(); + + final TableFormatDeserializer actualDeser = + TableFormatForRowUtils.getTableFormatDeserializer( + properties, + getClass().getClassLoader()); + + assertEquals(expectedDeser, actualDeser); + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvTest.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvTest.java new file mode 100644 index 0000000000..618ff8e2a7 --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgtlogcsv; + +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.descriptors.Descriptor; +import org.apache.flink.table.descriptors.DescriptorTestBase; +import org.apache.flink.table.descriptors.DescriptorValidator; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the {@link InLongMsgTlogCsv} descriptor. + */ +public class InLongMsgTlogCsvTest extends DescriptorTestBase { + + private static final String TEST_SCHEMA = + "{" + + "\"type\":\"row\"," + + "\"fieldFormats\":[{" + + "\"name\":\"student_name\"," + + "\"format\":{\"type\":\"string\"}" + + "},{" + + "\"name\":\"score\"," + + "\"format\":{\"type\":\"int\"}" + + "},{" + + "\"name\":\"date\"," + + "\"format\":{" + + "\"type\":\"date\"," + + "\"format\":\"yyyy-MM-dd\"" + + "}" + + "}]" + + "}"; + + private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA = + new InLongMsgTlogCsv() + .schema(TEST_SCHEMA) + .timeFieldName("time") + .attributesFieldName("attributes") + .delimiter(';') + .charset(StandardCharsets.ISO_8859_1) + .escapeCharacter('\\') + .quoteCharacter('\"') + .nullLiteral("n/a") + .ignoreErrors(); + + @Test(expected = ValidationException.class) + public void testInvalidIgnoreParseErrors() { + addPropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, "format.escape-character", "DDD"); + } + + @Test(expected = ValidationException.class) + public void testMissingSchema() { + removePropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, "format.schema"); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public List<Descriptor> descriptors() { + return Collections.singletonList(CUSTOM_DESCRIPTOR_WITH_SCHEMA); + } + + @Override + public List<Map<String, String>> properties() { + final Map<String, String> props1 = new HashMap<>(); + props1.put("format.type", "InLongMsg-TLOGCSV"); + props1.put("format.property-version", "1"); + props1.put("format.schema", TEST_SCHEMA); + props1.put("format.time-field-name", "time"); + props1.put("format.attributes-field-name", "attributes"); + props1.put("format.delimiter", ";"); + props1.put("format.charset", "ISO-8859-1"); + props1.put("format.escape-character", "\\"); + props1.put("format.quote-character", "\""); + props1.put("format.null-literal", "n/a"); + props1.put("format.ignore-errors", "true"); + + return Collections.singletonList(props1); + } + + @Override + public DescriptorValidator validator() { + return new InLongMsgValidator(); + } +} diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/resources/log4j-test.properties b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000..881dc0609b --- /dev/null +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/inlong-sort/sort-formats/format-row/pom.xml b/inlong-sort/sort-formats/format-row/pom.xml index 4b74124162..8fde4b1512 100644 --- a/inlong-sort/sort-formats/format-row/pom.xml +++ b/inlong-sort/sort-formats/format-row/pom.xml @@ -40,6 +40,7 @@ <module>format-inlongmsg-csv</module> <module>format-inlongmsg-kv</module> <module>format-inlongmsg-pb</module> + <module>format-inlongmsg-tlogcsv</module> </modules> <properties>
