prateekm commented on a change in pull request #1161: Transactional State
[2/5]: Added a ChangelogSSPIterator API
URL: https://github.com/apache/samza/pull/1161#discussion_r330340200
##########
File path:
samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
##########
@@ -109,42 +105,89 @@ class KeyValueStorageEngine[K, V](
}
/**
- * Restore the contents of this key/value store from the change log,
- * batching updates to underlying raw store to notAValidEvent wrapping
functions for efficiency.
+ * Restore the contents of this key/value store from the change log,
batching updates to underlying raw store
+ * for efficiency.
+ *
+ * With transactional state disabled, iterator mode will always be
'restore'. With transactional state enabled,
+ * iterator mode may switch from 'restore' to 'trim' at some point, but will
not switch back to 'restore'.
*/
- def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) {
+ def restore(iterator: ChangelogSSPIterator) {
info("Restoring entries for store: " + storeName + " in directory: " +
storeDir.toString)
+ var restoredMessages = 0
+ var restoredBytes = 0
+ var trimmedMessages = 0
+ var trimmedBytes = 0
val batch = new java.util.ArrayList[Entry[Array[Byte],
Array[Byte]]](batchSize)
+ var lastBatchFlushed = false
- for (envelope <- envelopes.asScala) {
+ while(iterator.hasNext) {
+ val envelope = iterator.next()
val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
-
- batch.add(new Entry(keyBytes, valBytes))
-
- if (batch.size >= batchSize) {
- doPutAll(rawStore, batch)
- batch.clear()
- }
-
- if (valBytes != null) {
- metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue +
valBytes.length)
+ val mode = iterator.getMode
+
+ if (mode.equals(ChangelogSSPIterator.Mode.RESTORE)) {
+ batch.add(new Entry(keyBytes, valBytes))
Review comment:
Will fix in PR 5/5. SAMZA-2337 to track.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services