egalpin commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r617016322
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1177,12 +1272,513 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
* Elasticsearch.
*
* @param usePartialUpdate set to true to issue partial updates
- * @return the {@link Write} with the partial update control set
+ * @return the {@link DocToBulk} with the partial update control set
*/
- public Write withUsePartialUpdate(boolean usePartialUpdate) {
+ public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
return builder().setUsePartialUpdate(usePartialUpdate).build();
}
+ /**
+ * Whether to use scripted updates and what script to use.
+ *
+ * @param source set to the value of the script source, painless lang
+ * @return the {@link DocToBulk} with the scripted updates set
+ */
+ public DocToBulk withUpsertScript(String source) {
+ return
builder().setUsePartialUpdate(false).setUpsertScript(source).build();
+ }
+
+ /**
+ * Provide a function to extract the doc version from the document. This
version number will be
+ * used as the document version in Elasticsearch. Should the function
throw an Exception then
+ * the batch will fail and the exception propagated. Incompatible with
update operations and
+ * should only be used with withUsePartialUpdate(false)
+ *
+ * @param docVersionFn to extract the document version
+ * @return the {@link DocToBulk} with the function set
+ */
+ public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
+ checkArgument(docVersionFn != null, "docVersionFn must not be null");
+ return builder().setDocVersionFn(docVersionFn).build();
+ }
+
+ /**
+ * Provide a function to extract the target operation either upsert or
delete from the document
+ * fields allowing dynamic bulk operation decision. While using
withIsDeleteFn, it should be
+ * taken care that the document's id extraction is defined using the
withIdFn function or else
+ * IllegalArgumentException is thrown. Should the function throw an
Exception then the batch
+ * will fail and the exception propagated.
+ *
+ * @param isDeleteFn set to true for deleting the specific document
+ * @return the {@link Write} with the function set
+ */
+ public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn
isDeleteFn) {
+ checkArgument(isDeleteFn != null, "deleteFn is required");
+ return builder().setIsDeleteFn(isDeleteFn).build();
+ }
+
+ /**
+ * Provide a function to extract the doc version from the document. This
version number will be
+ * used as the document version in Elasticsearch. Should the function
throw an Exception then
+ * the batch will fail and the exception propagated. Incompatible with
update operations and
+ * should only be used with withUsePartialUpdate(false)
+ *
+ * @param docVersionType the version type to use, one of {@value
ElasticsearchIO#VERSION_TYPES}
+ * @return the {@link DocToBulk} with the doc version type set
+ */
+ public DocToBulk withDocVersionType(String docVersionType) {
+ checkArgument(
+ VERSION_TYPES.contains(docVersionType),
+ "docVersionType must be one of " + "%s",
+ String.join(", ", VERSION_TYPES));
+ return builder().setDocVersionType(docVersionType).build();
+ }
+
+ /**
+ * Use to set explicitly which version of Elasticsearch the destination
cluster is running.
+ * Providing this hint means there is no need for setting {@link
+ * DocToBulk#withConnectionConfiguration}. This can also be very useful
for testing purposes.
+ *
+ * @param backendVersion the major version number of the version of
Elasticsearch being run in
+ * the cluster where documents will be indexed.
+ * @return the {@link DocToBulk} with the Elasticsearch major version
number set
+ */
+ public DocToBulk withBackendVersion(int backendVersion) {
+ checkArgument(
+ VALID_CLUSTER_VERSIONS.contains(backendVersion),
+ "Backend version may only be one of " + "%s",
+ String.join(", ", VERSION_TYPES));
+ return builder().setBackendVersion(backendVersion).build();
+ }
+
+ @Override
+ public PCollection<String> expand(PCollection<String> docs) {
+ ConnectionConfiguration connectionConfiguration =
getConnectionConfiguration();
+ Integer backendVersion = getBackendVersion();
+ Write.FieldValueExtractFn idFn = getIdFn();
+ Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+ checkState(
+ (backendVersion != null || connectionConfiguration != null),
+ "withBackendVersion() or withConnectionConfiguration() is required");
+ checkArgument(
+ isDeleteFn == null || idFn != null,
+ "Id needs to be specified by withIdFn for delete operation");
+
+ return docs.apply(ParDo.of(new DocToBulkFn(this)));
+ }
+
+ // Encapsulates the elements which form the metadata for an Elasticsearch
bulk operation
+ private static class DocumentMetadata implements Serializable {
+ final String index;
+ final String type;
+ final String id;
+ final Integer retryOnConflict;
+ final String routing;
+ final Integer backendVersion;
+ final String version;
+ final String versionType;
+
+ DocumentMetadata(
+ String index,
+ String type,
+ String id,
+ Integer retryOnConflict,
+ String routing,
+ Integer backendVersion,
+ String version,
+ String versionType) {
+ this.index = index;
+ this.id = id;
+ this.type = type;
+ this.retryOnConflict = retryOnConflict;
+ this.routing = routing;
+ this.backendVersion = backendVersion;
+ this.version = version;
+ this.versionType = versionType;
+ }
+ }
+
+ private static class DocumentMetadataSerializer extends
StdSerializer<DocumentMetadata> {
+ private DocumentMetadataSerializer() {
+ super(DocumentMetadata.class);
+ }
+
+ @Override
+ public void serialize(DocumentMetadata value, JsonGenerator gen,
SerializerProvider provider)
+ throws IOException {
+ gen.writeStartObject();
+ if (value.index != null) {
+ gen.writeStringField("_index", value.index);
+ }
+ if (value.type != null) {
+ gen.writeStringField("_type", value.type);
+ }
+ if (value.id != null) {
+ gen.writeStringField("_id", value.id);
+ }
+ if (value.routing != null) {
+ gen.writeStringField("routing", value.routing);
+ }
+ if (value.retryOnConflict != null && value.backendVersion <= 6) {
+ gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
+ }
+ if (value.retryOnConflict != null && value.backendVersion >= 7) {
+ gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
+ }
+ if (value.version != null) {
+ gen.writeStringField("version", value.version);
+ }
+ if (value.versionType != null) {
+ gen.writeStringField("version_type", value.versionType);
+ }
+ gen.writeEndObject();
+ }
+ }
+
+ @VisibleForTesting
+ static String createBulkApiEntity(DocToBulk spec, String document, int
backendVersion)
+ throws IOException {
+ String documentMetadata = "{}";
+ boolean isDelete = false;
+ if (spec.getIndexFn() != null || spec.getTypeFn() != null ||
spec.getIdFn() != null) {
+ // parse once and reused for efficiency
+ JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+ documentMetadata = getDocumentMetadata(spec, parsedDocument,
backendVersion);
+ if (spec.getIsDeleteFn() != null) {
+ isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+ }
+ }
+
+ if (isDelete) {
+ // delete request used for deleting a document
+ return String.format("{ \"delete\" : %s }%n", documentMetadata);
+ } else {
+ // index is an insert/upsert and update is a partial update (or insert
if not
+ // existing)
+ if (spec.getUsePartialUpdate()) {
+ return String.format(
+ "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" :
true }%n",
+ documentMetadata, document);
+ } else if (spec.getUpsertScript() != null) {
+ return String.format(
+ "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+ + "\"params\": %s}, \"upsert\" : %s }%n",
+ documentMetadata, spec.getUpsertScript(), document, document);
+ } else {
+ return String.format("{ \"index\" : %s }%n%s%n", documentMetadata,
document);
+ }
+ }
+ }
+
+ private static String lowerCaseOrNull(String input) {
+ return input == null ? null : input.toLowerCase();
+ }
+
+ /**
+ * Extracts the components that comprise the document address from the
document using the {@link
+ * Write.FieldValueExtractFn} configured. This allows any or all of the
index, type and document
+ * id to be controlled on a per document basis. If none are provided then
an empty default of
+ * {@code {}} is returned. Sanitization of the index is performed,
automatically lower-casing
+ * the value as required by Elasticsearch.
+ *
+ * @param parsedDocument the json from which the index, type and id may be
extracted
+ * @return the document address as JSON or the default
+ * @throws IOException if the document cannot be parsed as JSON
+ */
+ private static String getDocumentMetadata(
+ DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws
IOException {
+ DocumentMetadata metadata =
+ new DocumentMetadata(
+ spec.getIndexFn() != null
+ ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+ : null,
+ spec.getTypeFn() != null ?
spec.getTypeFn().apply(parsedDocument) : null,
+ spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) :
null,
+ (spec.getUsePartialUpdate()
+ || (spec.getUpsertScript() != null &&
!spec.getUpsertScript().isEmpty()))
+ ? DEFAULT_RETRY_ON_CONFLICT
+ : null,
+ spec.getRoutingFn() != null ?
spec.getRoutingFn().apply(parsedDocument) : null,
+ backendVersion,
+ spec.getDocVersionFn() != null ?
spec.getDocVersionFn().apply(parsedDocument) : null,
+ spec.getDocVersionType());
+ return OBJECT_MAPPER.writeValueAsString(metadata);
+ }
+
+ /** {@link DoFn} to for the {@link DocToBulk} transform. */
+ @VisibleForTesting
+ static class DocToBulkFn extends DoFn<String, String> {
+ private final DocToBulk spec;
+ private int backendVersion;
+
+ public DocToBulkFn(DocToBulk spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void setup() throws IOException {
+ ConnectionConfiguration connectionConfiguration =
spec.getConnectionConfiguration();
+ if (spec.getBackendVersion() == null) {
+ backendVersion =
ElasticsearchIO.getBackendVersion(connectionConfiguration);
+ } else {
+ backendVersion = spec.getBackendVersion();
+ }
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ c.output(createBulkApiEntity(spec, c.element(), backendVersion));
+ }
+ }
+ }
+
+ /**
+ * A {@link PTransform} convenience wrapper for doing both document to bulk
API serialization as
+ * well as batching those Bulk API entities and writing them to an
Elasticsearch cluster. This
+ * class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for
convenience and backward
+ * compatibility.
+ */
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<String>,
PDone> {
+ public interface FieldValueExtractFn extends
SerializableFunction<JsonNode, String> {}
+
+ public interface BooleanFieldValueExtractFn extends
SerializableFunction<JsonNode, Boolean> {}
+
+ public abstract DocToBulk getDocToBulk();
+
+ public abstract BulkIO getBulkIO();
+
+ abstract Builder writeBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setDocToBulk(DocToBulk docToBulk);
+
+ abstract Builder setBulkIO(BulkIO bulkIO);
+
+ abstract Write build();
+ }
+
+ // For building Doc2Bulk
+ /** Refer to {@link DocToBulk#withIdFn}. */
+ public Write withIdFn(FieldValueExtractFn idFn) {
+ return
writeBuilder().setDocToBulk(getDocToBulk().withIdFn(idFn)).build();
+ }
+
+ /** Refer to {@link DocToBulk#withIndexFn}. */
+ public Write withIndexFn(FieldValueExtractFn indexFn) {
+ return
writeBuilder().setDocToBulk(getDocToBulk().withIndexFn(indexFn)).build();
+ }
+
+ /** Refer to {@link DocToBulk#withRoutingFn}. */
+ public Write withRoutingFn(FieldValueExtractFn routingFn) {
+ return
writeBuilder().setDocToBulk(getDocToBulk().withRoutingFn(routingFn)).build();
+ }
+
+ /** Refer to {@link DocToBulk#withTypeFn}. */
+ public Write withTypeFn(FieldValueExtractFn typeFn) {
+ return
writeBuilder().setDocToBulk(getDocToBulk().withTypeFn(typeFn)).build();
+ }
+
+ /** Refer to {@link DocToBulk#withDocVersionFn}. */
+ public Write withDocVersionFn(FieldValueExtractFn docVersionFn) {
+ return
writeBuilder().setDocToBulk(getDocToBulk().withDocVersionFn(docVersionFn)).build();
+ }
+
+ /** Refer to {@link DocToBulk#withDocVersionType}. */
+ public Write withDocVersionType(String docVersionType) {
+ return
writeBuilder().setDocToBulk(getDocToBulk().withDocVersionType(docVersionType)).build();
+ }
+
+ /** Refer to {@link DocToBulk#withUsePartialUpdate}. */
+ public Write withUsePartialUpdate(boolean usePartialUpdate) {
+ return writeBuilder()
+ .setDocToBulk(getDocToBulk().withUsePartialUpdate(usePartialUpdate))
+ .build();
+ }
+
+ /** Refer to {@link DocToBulk#withUpsertScript}. */
+ public Write withUpsertScript(String source) {
+ return
writeBuilder().setDocToBulk(getDocToBulk().withUpsertScript(source)).build();
+ }
+
+ /** Refer to {@link DocToBulk#withBackendVersion}. */
+ public Write withBackendVersion(int backendVersion) {
+ return
writeBuilder().setDocToBulk(getDocToBulk().withBackendVersion(backendVersion)).build();
+ }
+
+ /** Refer to {@link DocToBulk#withIsDeleteFn}. */
+ public Write withIsDeleteFn(Write.BooleanFieldValueExtractFn isDeleteFn) {
+ return
writeBuilder().setDocToBulk(getDocToBulk().withIsDeleteFn(isDeleteFn)).build();
+ }
+ // End building Doc2Bulk
+
+ /** Refer to {@link BulkIO#withConnectionConfiguration}. */
+ public Write withConnectionConfiguration(ConnectionConfiguration
connectionConfiguration) {
+ checkArgument(connectionConfiguration != null, "connectionConfiguration
can not be null");
+
+ return writeBuilder()
+
.setDocToBulk(getDocToBulk().withConnectionConfiguration(connectionConfiguration))
+
.setBulkIO(getBulkIO().withConnectionConfiguration(connectionConfiguration))
+ .build();
+ }
+
+ /** Refer to {@link BulkIO#withMaxBatchSize}. */
+ public Write withMaxBatchSize(long batchSize) {
+ return
writeBuilder().setBulkIO(getBulkIO().withMaxBatchSize(batchSize)).build();
+ }
+
+ /** Refer to {@link BulkIO#withMaxBatchSizeBytes}. */
+ public Write withMaxBatchSizeBytes(long batchSizeBytes) {
+ return
writeBuilder().setBulkIO(getBulkIO().withMaxBatchSizeBytes(batchSizeBytes)).build();
+ }
+
+ /** Refer to {@link BulkIO#withRetryConfiguration}. */
+ public Write withRetryConfiguration(RetryConfiguration retryConfiguration)
{
+ return writeBuilder()
+ .setBulkIO(getBulkIO().withRetryConfiguration(retryConfiguration))
+ .build();
+ }
+
+ /** Refer to {@link BulkIO#withIgnoreVersionConflicts}. */
+ public Write withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+ return writeBuilder()
+
.setBulkIO(getBulkIO().withIgnoreVersionConflicts(ignoreVersionConflicts))
+ .build();
+ }
+
+ /** Refer to {@link BulkIO#withUseStatefulBatches}. */
+ public Write withUseStatefulBatches(boolean useStatefulBatches) {
+ return writeBuilder()
+ .setBulkIO(getBulkIO().withUseStatefulBatches(useStatefulBatches))
+ .build();
+ }
+
+ /** Refer to {@link BulkIO#withMaxBufferingDuration}. */
+ public Write withMaxBufferingDuration(Duration maxBufferingDuration) {
+ return writeBuilder()
+
.setBulkIO(getBulkIO().withMaxBufferingDuration(maxBufferingDuration))
+ .build();
+ }
+
+ /** Refer to {@link BulkIO#withMaxParallelRequestsPerWindow}. */
+ public Write withMaxParallelRquestsPerWindow(int
maxParallelRquestsPerWindow) {
+ return writeBuilder()
+
.setBulkIO(getBulkIO().withMaxParallelRequestsPerWindow(maxParallelRquestsPerWindow))
+ .build();
+ }
+
+ /** Refer to {@link BulkIO#withAllowableResponseErrors}. */
+ public Write withAllowableResponseErrors(@Nullable Set<String>
allowableResponseErrors) {
Review comment:
This was added for a specific use case with respect to document
versioning in particular where the ID is monotonically increasing for new
versions of a given document i.e. higher ID always means newer doc.
In that case, it doesn't matter if an older document arrives at an ES
cluster after a document with a larger version ID has already been indexed for
the same doc ID. So ignoring version conflicts can allow for batches containing
an out-of-date documents to complete without raising an error and getting stuck
in an infinite retry loop
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]