georgecma commented on code in PR #25831:
URL: https://github.com/apache/beam/pull/25831#discussion_r1139080825
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder
builder) {
private transient BufferedMutator mutator;
}
}
+
+ public static WriteRowMutations writeRowMutations() {
+ return new WriteRowMutations(null /* Configuration */, "");
+ }
+
+ /** Transformation that writes RowMutation objects to a Hbase table. */
+ public static class WriteRowMutations
+ extends PTransform<PCollection<KV<byte[], RowMutations>>,
PCollection<Integer>> {
+
+ /** Writes to the HBase instance indicated by the* given Configuration. */
+ public WriteRowMutations withConfiguration(Configuration configuration) {
+ checkNotNull(configuration, "configuration cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ /** Writes to the specified table. */
+ public WriteRowMutations withTableId(String tableId) {
+ checkNotNull(tableId, "tableId cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ private WriteRowMutations(Configuration configuration, String tableId) {
+ this.configuration = configuration;
+ this.tableId = tableId;
+ }
+
+ @Override
+ public PCollection<Integer> expand(PCollection<KV<byte[], RowMutations>>
input) {
+ checkNotNull(configuration, "withConfiguration() is required");
+ checkNotNull(tableId, "withTableId() is required");
+ checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+ return input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("configuration", configuration.toString()));
+ builder.add(DisplayData.item("tableId", tableId));
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+ return
configuration.toString().equals(writeRowMutations.configuration.toString())
+ && Objects.equals(tableId, writeRowMutations.tableId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(configuration, tableId);
+ }
+
+ /**
+ * The writeReplace method allows the developer to provide a replacement
object that will be
+ * serialized instead of the original one. We use this to keep the
enclosed class immutable. For
+ * more details on the technique see <a
+ *
href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
+ * article</a>.
+ */
+ private Object writeReplace() {
+ return new SerializationProxy(this);
+ }
+
+ private static class SerializationProxy implements Serializable {
+ public SerializationProxy() {}
+
+ public SerializationProxy(WriteRowMutations writeRowMutations) {
+ configuration = writeRowMutations.configuration;
+ tableId = writeRowMutations.tableId;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ SerializableCoder.of(SerializableConfiguration.class)
+ .encode(new SerializableConfiguration(this.configuration), out);
+
+ StringUtf8Coder.of().encode(this.tableId, out);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException {
+ this.configuration =
SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+ this.tableId = StringUtf8Coder.of().decode(in);
+ }
+
+ Object readResolve() {
+ return
HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+ }
+
+ private Configuration configuration;
+ private String tableId;
+ }
+
+ private final Configuration configuration;
+ private final String tableId;
+
+ /** Function to write row mutations to a hbase table. */
+ private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>,
Integer> {
+
+ public WriteRowMutationsFn(
+ WriteRowMutations writeRowMutations) { // , HbaseSharedConnection
hbaseSharedConnection) {
+ checkNotNull(writeRowMutations.tableId, "tableId");
+ checkNotNull(writeRowMutations.configuration, "configuration");
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ connection = HBaseSharedConnection.getOrCreate(configuration);
+ }
+
+ @StartBundle
+ public void startBundle(StartBundleContext c) throws IOException {
+ table = connection.getTable(TableName.valueOf(tableId));
+ recordsWritten = 0;
+ }
+
+ @FinishBundle
+ public void finishBundle() throws Exception {
+ if (table != null) {
+ table.close();
+ table = null;
+ }
+
+ LOG.debug("Wrote {} records", recordsWritten);
+ }
+
+ @Teardown
+ public void tearDown() throws Exception {
+
+ if (table != null) {
+ table.close();
+ table = null;
+ }
+
+ HBaseSharedConnection.close();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ RowMutations mutations = c.element().getValue();
+
+ try {
+ // Use Table instead of BufferedMutator to preserve mutation-ordering
+ table.mutateRow(mutations);
+ } catch (Exception e) {
+ throw new Exception(
+ (String.join(
+ " ",
+ "Table",
+ tableId,
+ "row",
+ Bytes.toString(mutations.getRow()),
+ "mutation failed.",
+ "\nTable Available/Enabled:",
+ Boolean.toString(
+
connection.getAdmin().isTableAvailable(TableName.valueOf(tableId))),
+ Boolean.toString(
+
connection.getAdmin().isTableEnabled(TableName.valueOf(tableId))),
+ "\nConnection Closed/Aborted/Locks:",
+ Boolean.toString(connection.isClosed()),
+ Boolean.toString(connection.isAborted()))));
+ }
+
+ // Dummy output so that we can get Dataflow stats for throughput.
+ c.output(1);
Review Comment:
Using PDone for now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]