robertwb commented on code in PR #32366: URL: https://github.com/apache/beam/pull/32366#discussion_r1739389089
########## sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java: ########## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A utility that filters fields from Beam {@link Row}s. This filter can be configured to indicate + * what fields you would like to either <strong>keep</strong> or <strong>drop</strong>. Afterward, Review Comment: What about the unnesting `only` option? ########## sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java: ########## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A utility that filters fields from Beam {@link Row}s. This filter can be configured to indicate + * what fields you would like to either <strong>keep</strong> or <strong>drop</strong>. Afterward, + * call {@link #filter(Row)} on a Schema-compatible Row to filter it. An un-configured filter will + * simply return the input row untouched. + * + * <p>Nested fields can be expressed using dot-notation (e.g. {@code "top.middle.nested"}). + * + * <p>A configured {@link RowFilter} will naturally produce {@link Row}s with a new Beam {@link + * Schema}. You can access this new Schema ahead of time via the filter's {@link #outputSchema()}. + * + * <p>Configure a {@link RowFilter} as follows: + * + * <pre>{@code + * // this is an un-configured filter + * RowFilter unconfigured = new RowFilter(beamSchema); + * + * List<String> fields = Arrays.asList("foo", "bar.xyz", "baz.abc.qwe"); + * + * // this filter will exclusively keep these fields and drop everything else + * RowFilter keepingFilter = new RowFilter(beamSchema).keeping(fields); + * + * // this filter will drop these fields + * RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields); + * + * // produces a filtered row + * Row outputRow = keepingFilter.filter(row); + * }</pre> + * + * Check the documentation for {@link #keeping(List)} and {@link #dropping(List)} for further + * details on what a filtered Row can look like. + */ +public class RowFilter implements Serializable { + private final Schema rowSchema; + private @Nullable Schema transformedSchema; + + public RowFilter(Schema rowSchema) { + this.rowSchema = rowSchema; + } + + /** + * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed + * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a + * nested field could not be reached. + */ + @VisibleForTesting Review Comment: Nit: Perhaps put the helper methods lower to keep the public API more visible/togeather. ########## sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java: ########## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A utility that filters fields from Beam {@link Row}s. This filter can be configured to indicate + * what fields you would like to either <strong>keep</strong> or <strong>drop</strong>. Afterward, + * call {@link #filter(Row)} on a Schema-compatible Row to filter it. An un-configured filter will + * simply return the input row untouched. + * + * <p>Nested fields can be expressed using dot-notation (e.g. {@code "top.middle.nested"}). + * + * <p>A configured {@link RowFilter} will naturally produce {@link Row}s with a new Beam {@link + * Schema}. You can access this new Schema ahead of time via the filter's {@link #outputSchema()}. + * + * <p>Configure a {@link RowFilter} as follows: + * + * <pre>{@code + * // this is an un-configured filter + * RowFilter unconfigured = new RowFilter(beamSchema); + * + * List<String> fields = Arrays.asList("foo", "bar.xyz", "baz.abc.qwe"); + * + * // this filter will exclusively keep these fields and drop everything else + * RowFilter keepingFilter = new RowFilter(beamSchema).keeping(fields); + * + * // this filter will drop these fields + * RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields); + * + * // produces a filtered row + * Row outputRow = keepingFilter.filter(row); + * }</pre> + * + * Check the documentation for {@link #keeping(List)} and {@link #dropping(List)} for further + * details on what a filtered Row can look like. + */ +public class RowFilter implements Serializable { + private final Schema rowSchema; + private @Nullable Schema transformedSchema; + + public RowFilter(Schema rowSchema) { + this.rowSchema = rowSchema; + } + + /** + * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed + * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a + * nested field could not be reached. + */ + @VisibleForTesting + static void validateSchemaContainsFields( + Schema schema, List<String> specifiedFields, String operation) { + Set<String> notFound = new HashSet<>(); + Set<String> notRowField = new HashSet<>(); + + for (String field : specifiedFields) { + List<String> levels = Splitter.on(".").splitToList(field); + + Schema currentSchema = schema; + + for (int i = 0; i < levels.size(); i++) { + String currentFieldName = String.join(".", levels.subList(0, i + 1)); + + if (!currentSchema.hasField(levels.get(i))) { + notFound.add(currentFieldName); + break; + } + + if (i + 1 < levels.size()) { + Schema.Field nextField = currentSchema.getField(levels.get(i)); + if (!nextField.getType().getTypeName().equals(Schema.TypeName.ROW)) { + notRowField.add(currentFieldName); + break; + } + currentSchema = Preconditions.checkNotNull(nextField.getType().getRowSchema()); + } + } + } + + if (!notFound.isEmpty() || !notRowField.isEmpty()) { + String message = "Validation failed for " + operation + "."; + if (!notFound.isEmpty()) { + message += "\nRow Schema does not contain the following specified fields: " + notFound; + } + if (!notRowField.isEmpty()) { + message += + "\nThe following specified fields are not of type Row. Their nested fields could not be reached: " + + notRowField; + } + throw new IllegalArgumentException(message); + } + } + + /** + * Configures this {@link RowFilter} to filter {@link Row}s by keeping only the specified fields. + * Nested fields can be specified using dot-notation. + * + * <p>For example, if we want to keep the list of fields {@code ["foo", "baz.nested_1"]}, for the + * input {@link Row}: + * + * <pre>{@code + * foo: 123 + * bar: 456 + * baz: + * nested_1: abc + * nested_2: xyz + * }</pre> + * + * we will get the following output {@link Row}: + * + * <pre>{@code + * foo: 123 + * baz + * nested_1: abc + * }</pre> + */ + public RowFilter keeping(List<String> fields) { + Preconditions.checkState( + transformedSchema == null, + "This RowFilter has already been configured to filter to the following Schema: %s", + transformedSchema); + validateSchemaContainsFields(rowSchema, fields, "\"keep\""); + transformedSchema = keepFields(rowSchema, fields); + return this; + } + + /** + * Configures this {@link RowFilter} to filter {@link Row} by removing the specified fields. + * Nested fields can be specified using dot-notation. Review Comment: Do we have a clear usecase for this? I'm not convinced it's worth the complexity (not just here, but for other transforms (possibly in other languages) that might also adopt these conventions, and further other tools that might want to do things like validation). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
