This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 04eff7d Integrate BigQuery sink file loads with GroupIntoBatches
new dc29b8c Merge pull request #13859 from [BEAM-11772] Integrate
BigQuery sink file loads with GroupIntoBatches
04eff7d is described below
commit 04eff7db83c64609ee2f27abb0003231c53b9c0b
Author: sychen <[email protected]>
AuthorDate: Sun Jan 31 11:49:07 2021 -0800
Integrate BigQuery sink file loads with GroupIntoBatches
---
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 130 ++++++++++++++++-----
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 17 ++-
.../gcp/bigquery/WriteGroupedRecordsToFiles.java | 21 ++--
.../sdk/io/gcp/testing/FakeDatasetService.java | 10 ++
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 69 +++++++++++
5 files changed, 200 insertions(+), 47 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index f804131..1828192 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -47,9 +47,12 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
@@ -265,31 +268,44 @@ class BatchLoads<DestinationT, ElementT>
// Expand the pipeline when the user has requested periodically-triggered
file writes.
private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>>
input) {
- checkArgument(numFileShards > 0);
Pipeline p = input.getPipeline();
final PCollectionView<String> loadJobIdPrefixView =
createJobIdPrefixView(p, JobType.LOAD);
final PCollectionView<String> copyJobIdPrefixView =
createJobIdPrefixView(p, JobType.COPY);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(p, loadJobIdPrefixView);
- // The user-supplied triggeringDuration is often chosen to control how
many BigQuery load
- // jobs are generated, to prevent going over BigQuery's daily quota for
load jobs. If this
- // is set to a large value, currently we have to buffer all the data until
the trigger fires.
- // Instead we ensure that the files are written if a threshold number of
records are ready.
- // We use only the user-supplied trigger on the actual BigQuery load. This
allows us to
- // offload the data to the filesystem.
- PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
- input.apply(
- "rewindowIntoGlobal",
- Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterFirst.of(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(triggeringFrequency),
-
AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
- .discardingFiredPanes());
- PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
- writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
+ PCollection<WriteBundlesToFiles.Result<DestinationT>> results;
+ if (numFileShards > 0) {
+ // The user-supplied triggeringFrequency is often chosen to control how
many BigQuery load
+ // jobs are generated, to prevent going over BigQuery's daily quota for
load jobs. If this
+ // is set to a large value, currently we have to buffer all the data
until the trigger fires.
+ // Instead we ensure that the files are written if a threshold number of
records are ready.
+ // We use only the user-supplied trigger on the actual BigQuery load.
This allows us to
+ // offload the data to the filesystem.
+ PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
+ input.apply(
+ "rewindowIntoGlobal",
+ Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterFirst.of(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(triggeringFrequency),
+
AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
+ .discardingFiredPanes());
+ results = writeStaticallyShardedFiles(inputInGlobalWindow,
tempFilePrefixView);
+ } else {
+ // In the case of dynamic sharding, however, we use a default triggering
and instead apply the
+ // user supplied triggeringFrequency to the sharding transform. See
+ // writeDynamicallyShardedFilesTriggered.
+ PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
+ input.apply(
+ "rewindowIntoGlobal",
+ Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+ results = writeDynamicallyShardedFilesTriggered(inputInGlobalWindow,
tempFilePrefixView);
+ }
+
// Apply the user's trigger before we start generating BigQuery load jobs.
results =
results.apply(
@@ -307,7 +323,7 @@ class BatchLoads<DestinationT, ElementT>
new TupleTag<>("singlePartitionTag");
// If we have non-default triggered output, we can't use the side-input
technique used in
- // expandUntriggered . Instead make the result list a main input. Apply a
GroupByKey first for
+ // expandUntriggered. Instead make the result list a main input. Apply a
GroupByKey first for
// determinism.
PCollectionTuple partitions =
results
@@ -371,8 +387,8 @@ class BatchLoads<DestinationT, ElementT>
.discardingFiredPanes());
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
(numFileShards == 0)
- ? writeDynamicallyShardedFiles(inputInGlobalWindow,
tempFilePrefixView)
- : writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
+ ? writeDynamicallyShardedFilesUntriggered(inputInGlobalWindow,
tempFilePrefixView)
+ : writeStaticallyShardedFiles(inputInGlobalWindow,
tempFilePrefixView);
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
new TupleTag<KV<ShardedKey<DestinationT>,
List<String>>>("multiPartitionsTag") {};
@@ -470,9 +486,10 @@ class BatchLoads<DestinationT, ElementT>
.apply("TempFilePrefixView", View.asSingleton());
}
- // Writes input data to dynamically-sharded, per-bundle files. Returns a
PCollection of filename,
- // file byte size, and table destination.
- PCollection<WriteBundlesToFiles.Result<DestinationT>>
writeDynamicallyShardedFiles(
+ // Writes input data to dynamically-sharded per-bundle files without
triggering. Input records are
+ // spilt to new files if memory is constrained. Returns a PCollection of
filename, file byte size,
+ // and table destination.
+ PCollection<WriteBundlesToFiles.Result<DestinationT>>
writeDynamicallyShardedFilesUntriggered(
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String>
tempFilePrefix) {
TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag =
new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles")
{};
@@ -513,9 +530,9 @@ class BatchLoads<DestinationT, ElementT>
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}
- // Writes input data to statically-sharded files. Returns a PCollection of
filename,
- // file byte size, and table destination.
- PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles(
+ // Writes input data to statically-sharded files. Returns a PCollection of
filename, file byte
+ // size, and table destination.
+ PCollection<WriteBundlesToFiles.Result<DestinationT>>
writeStaticallyShardedFiles(
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String>
tempFilePrefix) {
checkState(numFileShards > 0);
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords =
@@ -547,12 +564,67 @@ class BatchLoads<DestinationT, ElementT>
return writeShardedRecords(shardedRecords, tempFilePrefix);
}
+ // Writes input data to dynamically-sharded files with triggering. The input
data is sharded by
+ // table destinations and each destination may be sub-sharded dynamically.
Returns a PCollection
+ // of filename, file byte size, and table destination.
+ PCollection<WriteBundlesToFiles.Result<DestinationT>>
writeDynamicallyShardedFilesTriggered(
+ PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String>
tempFilePrefix) {
+ // In contrast to fixed sharding with triggering, here we use a global
window with default
+ // trigger and apply the user supplied triggeringFrequency in the
subsequent GroupIntoBatches
+ // transform. We also ensure that the files are written if a threshold
number of records are
+ // ready. Dynamic sharding is achieved via the withShardedKey() option
provided by
+ // GroupIntoBatches.
+ return input
+ .apply(
+ GroupIntoBatches.<DestinationT,
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+ .withMaxBufferingDuration(triggeringFrequency)
+ .withShardedKey())
+ .setCoder(
+ KvCoder.of(
+ org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
+ IterableCoder.of(elementCoder)))
+ .apply(
+ "StripShardId",
+ MapElements.via(
+ new SimpleFunction<
+ KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>,
Iterable<ElementT>>,
+ KV<DestinationT, Iterable<ElementT>>>() {
+ @Override
+ public KV<DestinationT, Iterable<ElementT>> apply(
+ KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>,
Iterable<ElementT>>
+ input) {
+ return KV.of(input.getKey().getKey(), input.getValue());
+ }
+ }))
+ .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
+ .apply(
+ "WriteGroupedRecords",
+ ParDo.of(
+ new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
+ tempFilePrefix, maxFileSize, rowWriterFactory))
+ .withSideInputs(tempFilePrefix))
+ .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+ }
+
private PCollection<Result<DestinationT>> writeShardedRecords(
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
PCollectionView<String> tempFilePrefix) {
return shardedRecords
.apply("GroupByDestination", GroupByKey.create())
.apply(
+ "StripShardId",
+ MapElements.via(
+ new SimpleFunction<
+ KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
+ KV<DestinationT, Iterable<ElementT>>>() {
+ @Override
+ public KV<DestinationT, Iterable<ElementT>> apply(
+ KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
+ return KV.of(input.getKey().getKey(), input.getValue());
+ }
+ }))
+ .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
+ .apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 1869c91..1e789c1 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2267,7 +2267,8 @@ public class BigQueryIO {
/**
* Control how many file shards are written when using BigQuery load jobs.
Applicable only when
- * also setting {@link #withTriggeringFrequency}.
+ * also setting {@link #withTriggeringFrequency}. To let runner determine
the sharding at
+ * runtime, set {@link #withAutoSharding()} instead.
*/
public Write<T> withNumFileShards(int numFileShards) {
checkArgument(numFileShards > 0, "numFileShards must be > 0, but was:
%s", numFileShards);
@@ -2350,10 +2351,10 @@ public class BigQueryIO {
}
/**
- * If true, enables dynamically determined number of shards to write to
BigQuery. Only
- * applicable to unbounded data with STREAMING_INSERTS.
- *
- * <p>TODO(BEAM-11408): Also integrate this option to FILE_LOADS.
+ * If true, enables using a dynamically determined number of shards to
write to BigQuery. This
+ * can be used for both {@link Method#FILE_LOADS} and {@link
Method#STREAMING_INSERTS}. Only
+ * applicable to unbounded data. If using {@link Method#FILE_LOADS},
numFileShards set via
+ * {@link #withNumFileShards} will be ignored.
*/
@Experimental
public Write<T> withAutoSharding() {
@@ -2751,7 +2752,11 @@ public class BigQueryIO {
batchLoads.setMaxRetryJobs(1000);
}
batchLoads.setTriggeringFrequency(getTriggeringFrequency());
- batchLoads.setNumFileShards(getNumFileShards());
+ if (getAutoSharding()) {
+ batchLoads.setNumFileShards(0);
+ } else {
+ batchLoads.setNumFileShards(getNumFileShards());
+ }
return input.apply(batchLoads);
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
index 6db179b..8c6366d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
@@ -20,17 +20,14 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ShardedKey;
/**
- * Receives elements grouped by their (sharded) destination, and writes them
out to a file. Since
- * all the elements in the {@link Iterable} are destined to the same table,
they are all written to
- * the same file. Ensures that only one {@link TableRowWriter} is active per
bundle.
+ * Receives elements grouped by their destination, and writes them out to a
file. Since all the
+ * elements in the {@link Iterable} are destined to the same table, they are
all written to the same
+ * file. Ensures that only one {@link TableRowWriter} is active per bundle.
*/
class WriteGroupedRecordsToFiles<DestinationT, ElementT>
- extends DoFn<
- KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
- WriteBundlesToFiles.Result<DestinationT>> {
+ extends DoFn<KV<DestinationT, Iterable<ElementT>>,
WriteBundlesToFiles.Result<DestinationT>> {
private final PCollectionView<String> tempFilePrefix;
private final long maxFileSize;
@@ -48,24 +45,24 @@ class WriteGroupedRecordsToFiles<DestinationT, ElementT>
@ProcessElement
public void processElement(
ProcessContext c,
- @Element KV<ShardedKey<DestinationT>, Iterable<ElementT>> element,
+ @Element KV<DestinationT, Iterable<ElementT>> element,
OutputReceiver<WriteBundlesToFiles.Result<DestinationT>> o)
throws Exception {
String tempFilePrefix = c.sideInput(this.tempFilePrefix);
BigQueryRowWriter<ElementT> writer =
- rowWriterFactory.createRowWriter(tempFilePrefix,
element.getKey().getKey());
+ rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey());
try {
for (ElementT tableRow : element.getValue()) {
if (writer.getByteSize() > maxFileSize) {
writer.close();
- writer = rowWriterFactory.createRowWriter(tempFilePrefix,
element.getKey().getKey());
+ writer = rowWriterFactory.createRowWriter(tempFilePrefix,
element.getKey());
BigQueryRowWriter.Result result = writer.getResult();
o.output(
new WriteBundlesToFiles.Result<>(
- result.resourceId.toString(), result.byteSize,
c.element().getKey().getKey()));
+ result.resourceId.toString(), result.byteSize,
c.element().getKey()));
}
writer.write(tableRow);
}
@@ -76,6 +73,6 @@ class WriteGroupedRecordsToFiles<DestinationT, ElementT>
BigQueryRowWriter.Result result = writer.getResult();
o.output(
new WriteBundlesToFiles.Result<>(
- result.resourceId.toString(), result.byteSize,
c.element().getKey().getKey()));
+ result.resourceId.toString(), result.byteSize,
c.element().getKey()));
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 7534a86..bfd6418 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -32,6 +32,7 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
@@ -62,8 +63,12 @@ public class FakeDatasetService implements DatasetService,
Serializable {
Map<String, List<String>> insertErrors = Maps.newHashMap();
+ // The counter for the number of insertions performed.
+ static AtomicInteger insertCount;
+
public static void setUp() {
tables = HashBasedTable.create();
+ insertCount = new AtomicInteger(0);
FakeJobService.setUp();
}
@@ -217,6 +222,10 @@ public class FakeDatasetService implements DatasetService,
Serializable {
}
}
+ public int getInsertCount() {
+ return insertCount.get();
+ }
+
public long insertAll(
TableReference ref, List<TableRow> rowList, @Nullable List<String>
insertIdList)
throws IOException, InterruptedException {
@@ -292,6 +301,7 @@ public class FakeDatasetService implements DatasetService,
Serializable {
failedInserts, allErrors.get(allErrors.size() - 1), ref,
rowList.get(i));
}
}
+ insertCount.addAndGet(1);
return dataSize;
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index d327ece..8d2f07a 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -57,6 +57,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -605,6 +606,74 @@ public class BigQueryIOWriteTest implements Serializable {
}
@Test
+ public void testTriggeredFileLoadsWithAutoSharding() throws Exception {
+ List<TableRow> elements = Lists.newArrayList();
+ for (int i = 0; i < 30; ++i) {
+ elements.add(new TableRow().set("number", i));
+ }
+
+ Instant startInstant = new Instant(0L);
+ TestStream<TableRow> testStream =
+ TestStream.create(TableRowJsonCoder.of())
+ // Initialize watermark for timer to be triggered correctly.
+ .advanceWatermarkTo(startInstant)
+ .addElements(
+ elements.get(0), Iterables.toArray(elements.subList(1, 10),
TableRow.class))
+ .advanceProcessingTime(Duration.standardMinutes(1))
+
.advanceWatermarkTo(startInstant.plus(Duration.standardSeconds(10)))
+ .addElements(
+ elements.get(10), Iterables.toArray(elements.subList(11, 20),
TableRow.class))
+ .advanceProcessingTime(Duration.standardMinutes(1))
+
.advanceWatermarkTo(startInstant.plus(Duration.standardSeconds(30)))
+ .addElements(
+ elements.get(20), Iterables.toArray(elements.subList(21, 30),
TableRow.class))
+ .advanceProcessingTime(Duration.standardMinutes(2))
+ .advanceWatermarkToInfinity();
+
+ int numTables = 3;
+ p.apply(testStream)
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(
+ (ValueInSingleWindow<TableRow> vsw) -> {
+ String tableSpec =
+ "project-id:dataset-id.table-"
+ + ((int) vsw.getValue().get("number") %
numTables);
+ return new TableDestination(tableSpec, null);
+ })
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new
TableFieldSchema().setName("number").setType("INTEGER"))))
+ .withTestServices(fakeBqServices)
+ // Set a triggering frequency without needing to also specify
numFileShards when
+ // using autoSharding.
+ .withTriggeringFrequency(Duration.standardSeconds(100))
+ .withAutoSharding()
+ .withMaxBytesPerPartition(1000)
+ .withMaxFilesPerPartition(10)
+ .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+ .withoutValidation());
+ p.run();
+
+ Map<Integer, List<TableRow>> elementsByTableIdx = new HashMap<>();
+ for (int i = 0; i < elements.size(); i++) {
+ elementsByTableIdx
+ .computeIfAbsent(i % numTables, k -> new ArrayList<>())
+ .add(elements.get(i));
+ }
+ for (Map.Entry<Integer, List<TableRow>> entry :
elementsByTableIdx.entrySet()) {
+ assertThat(
+ fakeDatasetService.getAllRows("project-id", "dataset-id", "table-" +
entry.getKey()),
+ containsInAnyOrder(Iterables.toArray(entry.getValue(),
TableRow.class)));
+ }
+ // For each table destination, it's expected to create two load jobs based
on the triggering
+ // frequency and processing time intervals.
+ assertEquals(2 * numTables, fakeDatasetService.getInsertCount());
+ }
+
+ @Test
public void testFailuresNoRetryPolicy() throws Exception {
TableRow row1 = new TableRow().set("name", "a").set("number", "1");
TableRow row2 = new TableRow().set("name", "b").set("number", "2");